FedERA
FedERA is a highly dynamic and customizable framework that can accommodate many use cases with flexibility by implementing several functionalities over different federated learning algorithms, and essentially creating a plug-and-play architecture to accommodate different use cases.
Check out the Overview of FedERA section for further information, including how to Installation the project.
Note
This project is under active development.
Contents
Overview of FedERA
Introduction
Federated Learning is a machine learning technique for training models on distributed data without sharing it. In traditional machine learning, large datasets must first be collected and then sent to one location where they can be combined before the model is trained on them. However, this process can cause privacy concerns as sensitive personal data may become publicly available. Federated learning attempts to address these concerns by keeping individual user’s data local while still allowing for powerful powerful statistical analysis that can be used to create accurate models at scale.
FedAvg is one of the foundational blocks of federated learning. A single communication round of FedAvg includes:
Waiting for a number of clients to connect to a server (Step 0)
Sending the clients a global model (Step 1)
Train the model with locally available data (Step 2)
Send the trained models back to the server (Step 3)
The server then averages the weights of the models and calculates a new aggregated model. This process constitutes a single communication round and several such communication rounds occur to train a model.

Overview
FedERA is a highly dynamic and customizable framework that can accommodate many use cases with flexibility by implementing several functionalities over vanilla FedAvg, and essentially creating a plug-and-play architecture to accommodate different use cases.
Federated Learning

Establishing Connection between Server and Clients

Communication with clients

Fractional and random subsampling
The Client_Manager can be used to sample the already connected clients.
A minimum number of clients can be provided, upon which the Client_Manager will wait for that many clients to connect before returning a reference to them.
If a fraction is provided, the Client_Manager will return that fraction of available clients.
The Client_Manager can sample the clients based on their connection order or a random order. A function can also be provided to determine the selection of clients.
Various modules in Feder
Feder is composed of 4 modules, each module building upon the last.
Verification module. Before aggregating, the server will perform a special verification round to determine which models to accommodate during aggregation.
Timeout module. Instead of waiting indefinitely for a client to finish training, the server will be able to issue a timeout, upon the completion of which, even if it hasn’t completed all epochs, the client will stop training and return the results.
Intermediate client connections module. New clients will be able to join the server anytime and may even be included in a round that is already live.
Carbon emissions tracking module. The framework will be able to track the carbon emissions of the clients during the training process.
Verification module
After the server receives the trained weights, it aggregates all of them to form the new model. However, the selection of models for aggregation can be modified.
Before aggregation, the server passes the models to a Verification module, which then uses a predefined procedure to generate scores for models, and then returns only those models that have performed above a defined threshold.
The Verification module can be easily customized.
Steps in the Verification module

Modified Federated Learning architecture


Timeout module
Often in real world scenarios, clients cannot keep training indefinitely. Therefore, a timeout functionality has been implemented.
The server can specify a timeout parameter as a Train order configuration. The client will then train till the timeout occurs, and then return the results.
Steps in the Timeout module

Intermediate client connections module
Now, even during the middle of a communication round, the server can accept new client connections, incorporate them into the Client_Manager and even include them in the ongoing communication round as well.
The server can be easily configured to allow or reject new connections during different parts of Federated Learning.
Safeguards to notify when a client has disconnected anytime have been implemented.
Carbon emissions tracking module
In FedERA CodeCarbon package is used to estimate the carbon emissions generated by clients during training. CodeCarbon is a Python package that provides an estimation of the carbon emissions associated with software code.
Tested on
FedERA has been extensively tested on and works with the following devices:
Intel CPUs
Nvidia GPUs
Nvidia Jetson
Raspberry Pi
Intel NUC
With FedERA, it is possible to operate the server and clients on separate devices or on a single device through various means, such as utilizing different terminals or implementing multiprocessing.

Installation
Install the package
Follow this procedure to prepare the environment and install FedERA:
Install a Python 3.8 (>=3.6, <=3.9) virtual environment using venv.
See the Venv installation guide for details.
- Create a new environment for the project.
- Using Virtual Environment
$ python3 -m venv env
- Using conda
$ conda create -n env python=3.9
Activate the virtual environment.
- Virtual Environment
$ source env/bin/activate
- Conda Environment
$ conda activate env
Install the FedERA package.
Install the stable version with pip:
$ pip install feder==$version$
Install the latest version from GitHub:
Clone the FedERA repository:
$ git clone https://github.com/anupamkliv/FedERA.git $ cd FedERA
Install dependencies:
$ pip install -r requirements.txt
FedERA with Docker
Follow this procedure to build a Docker image of FedERA:
Note
The purpose of the Docker edition of FedERA is to provide an isolated environment complete with the prerequisites to run. Once the execution is finished, the container can be eliminated, and the computation results will be accessible in a directory on the local host.
Install Docker on all nodes in the federation.
See the Docker installation guide for details.
Check that Docker is running properly with the Hello World command:
$ docker run hello-world Hello from Docker! This message shows that your installation appears to be working correctly. ... ... ...
Build the Docker image of FedERA:
$ docker build -t federa .
Run the Docker image of FedERA:
$ docker run federa
Tutorials
FedERA allows you to do federated learning in real time on various supported edge devices, including Intel CPUs, Nvidia GPUs, Nvidia Jetson, Raspberry Pi, Intel NUC. FedERA provides modular tools and standard algorithms to simplify federated learning implementation in real time using gRPC framework.
Step-by-step guide on running server and clients on same and different devices.
Step-by-step guide on how to customize federated learning algorithm.
Step-by-step guide on how to use different data dstributions while training.
Step-by-step guide on how to get carbon footprint of the training process.
Running the Server and Clients
Starting the Server
The server is started by running the following command in the root directory of the framework:
python -m federa.server.start_server
Arguments that can be passed to the server are:
Argument |
Description |
Default |
---|---|---|
|
specifies the aggregation algorithm |
|
|
specifies number of clients selected per round |
|
|
specifies fraction of clients selected |
|
|
specifies total number of rounds |
|
|
specifies initial server model path |
|
|
specifies client epochs per round |
|
|
determines if connections accepted after FL begins |
|
|
specifies if verification module runs before rounds |
|
|
specifies minimum verification score |
|
|
specifies client training time limit per round |
|
|
specifies dataset resize dimension |
|
|
specifies dataset batch size |
|
|
specifies network architecture |
|
|
specifies dataset name |
|
|
specifies data distribution among clients |
|
|
specifies if carbon emissions tracked at client side |
|
|
specifies whether to use SSL encryption or not |
|
|
specifies path to server key certificate |
|
|
specifies path to server certificate |
|
Starting the Clients
The clients are started by running the following command in the root directory of the framework:
python federa.client.start_client
Arguments that can be passed to the clients are:
Argument |
Description |
Default |
---|---|---|
|
specifies server IP address |
|
|
specifies device |
|
|
specifies whether to use SSL encryption or not |
|
|
specifies path to CA certificate |
|
|
specifies time to wait before reconnecting to the server |
|
Federated Learning Algorithms
The implementation of federated learning algorithms in Feder consists of two components: the training part on the client side and the aggregation part on the server side. The training functions are coded in the net_lib.py file at client/src directory, while the aggregation functions are located in various files within the algorithms folder at server/src directory.
The algorithms currently implemented in FedERA are:
FedAvg
FedDyn
FedAdam
FedAdagrad
Scaffold
FedAvgM
Mime
Mimelite
FedYogi
Adding a new algorithm to FedERA
To add a new algorithm to FedERA, you need to implement the training function on the client side and the aggregation function on the server side. The training function should be implemented in the net_lib.py file at client/src directory. The aggregation function should be implemented in a new file in the algorithms folder at server/src directory.
Implementing the training function
The training function should be implemented in the net_lib file, in a fashion similar to the following example of the mimelite algorithm:
def train_mimelite(net, state, trainloader, epochs, deadline=None):
#In the case of MimeLite, control_variate is nothing but a state like in case of momentum method
x = deepcopy(net)
criterion = torch.nn.CrossEntropyLoss()
lr = 0.001
momentum = 0.9
net.train()
for _ in tqdm(range(epochs)):
for images, labels in trainloader:
images, labels = images.to(DEVICE), labels.to(DEVICE)
loss = criterion(net(images), labels)
#Compute (full-batch) gradient of loss with respect to net's parameters
grads = torch.autograd.grad(loss,net.parameters())
#Update net's parameters using gradients
with torch.no_grad():
for param,grad,s in zip(net.parameters(), grads, state):
param.data = param.data - lr * ((1-momentum) * grad.data + momentum * s.data)
if deadline:
current_time = time.time()
if current_time >= deadline:
print("deadline occurred.")
break
#Compute gradient wrt the received model (x) using the wholde dataset
data = DataLoader(trainloader.dataset, batch_size = len(trainloader) * trainloader.batch_size, shuffle = True)
for images, labels in data:
images, labels = images.to(DEVICE), labels.to(DEVICE)
output = x(images)
loss = criterion(output, labels) #Calculate the loss with respect to y's output and labels
gradient_x = torch.autograd.grad(loss,x.parameters())
return net, gradient_x
After making the changes in the net_lib.py file, the client_lib.py file also needs to be updated so as to incorporate the newly defined algorithm. The client_lib.py file is located at client/src directory. The following code snippet shows the train function that needs to be updated in the client_lib.py file:
def train(train_order_message):
data_bytes = train_order_message.modelParameters
data = torch.load( BytesIO(data_bytes), map_location="cpu" )
model_parameters, control_variate, control_variate2 = data['model_parameters'], data['control_variate'], data['control_variate2']
config_dict_bytes = train_order_message.configDict
config_dict = json.loads( config_dict_bytes.decode("utf-8") )
carbon_tracker = config_dict["carbon_tracker"]
model = get_net(config= config_dict)
model.load_state_dict(model_parameters)
model = model.to(device)
epochs = config_dict["epochs"]
if config_dict["timeout"]:
deadline = time.time() + config_dict["timeout"]
else:
deadline = None
#Run code carbon if the carbon-tracker flag is True
if (carbon_tracker==1):
tracker = OfflineEmissionsTracker(country_iso_code="IND", output_dir = save_dir_path)
tracker.start()
trainloader, testloader, _ = load_data(config_dict)
print("training started")
if (config_dict['algorithm'] == 'mimelite'):
model, control_variate = train_mimelite(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'scaffold'):
model, control_variate = train_scaffold(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'mime'):
model, control_variate = train_mime(model, control_variate, control_variate2, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'fedavg'):
model = train_fedavg(model, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'feddyn'):
model = train_feddyn(model, trainloader, epochs, deadline)
else:
model = train_model(model, trainloader, epochs, deadline)
print("training finished")
if (carbon_tracker==1):
emissions: float = tracker.stop()
print(f"Emissions: {emissions} kg")
myJSON = json.dumps(config_dict)
json_path = save_dir_path + "/config.json"
with open(json_path, "w") as jsonfile:
jsonfile.write(myJSON)
json_path = "config.json"
with open(json_path, "w") as jsonfile:
jsonfile.write(myJSON)
trained_model_parameters = model.state_dict()
#Create a dictionary where model_parameters and control_variate are stored which needs to be sent to the server
data_to_send = {}
data_to_send['model_parameters'] = trained_model_parameters
data_to_send['control_variate'] = control_variate #If there is no control_variate, this will become None
buffer = BytesIO()
torch.save(data_to_send, buffer)
buffer.seek(0)
data_to_send_bytes = buffer.read()
print("train eval")
train_loss, train_accuracy = test_model(model, testloader)
response_dict = {"train_loss": train_loss, "train_accuracy": train_accuracy}
response_dict_bytes = json.dumps(response_dict).encode("utf-8")
train_response_message = TrainResponse(
modelParameters = data_to_send_bytes,
responseDict = response_dict_bytes)
save_model_state(model)
return train_response_message
Implementing the aggregation function
The aggregation function should be implemented within a class in a new file in the algorithms folder at server/src directory. The following code snippet shows the aggregation function for the mimelite algorithm as deffined in the mimelite.py file:
class mimelite():
def __init__(self, config):
self.algorithm = "MimeLite"
self.lr = 1.0
self.momentum = 0.9
def aggregate(self,server_model_state_dict, optimizer_state, state_dicts, gradients_x):
keys = server_model_state_dict.keys() #List of keys in a state_dict
avg_y = OrderedDict() #This will be our new server_model_state_dict
for key in keys:
current_key_tensors = [state_dict[key] for state_dict in state_dicts]
current_key_sum = functools.reduce( lambda accumulator, tensor: accumulator + tensor, current_key_tensors )
current_key_average = current_key_sum / len(state_dicts)
avg_y[key] = current_key_average
#Average all the gradient_x in gradients_x
avg_grads = []
for i in range(len(gradients_x[0])):
#Average all the i'th element of gradient_x present in the gradients_x
current_tensors = [gradient_x[i] for gradient_x in gradients_x]
current_sum = functools.reduce(lambda accumulator, tensor: accumulator + tensor, current_tensors)
current_average = current_sum / len(gradients_x)
avg_grads.append(current_average)
for state, grad in zip(optimizer_state, avg_grads):
state.data = self.momentum * state.data + (1 - self.momentum) * grad.data
return avg_y, optimizer_state
Datasets
The datasets used by FedERA are acquired by fetching them from torchvision.datasets. As of now, feder supports the following datasets:
MNIST
FashionMNIST
CIFAR10
CIFAR100
Adding support for new datasets
There are two methods for incorporating support for new datasets in feder. One involves utilizing torchvision.datasets, while the other entails implementing support for a custom dataset.
Adding support for a dataset available in torchvision.datasets
The torchvision.datasets package consists of popular datasets used in computer vision. The datasets are downloaded and cached automatically. The datasets are subclasses of torch.utils.data.Dataset i.e. they have the same API. This makes it easy to incorporate support for new datasets in feder. All that is required is to add a a few lines of code in the get_data function in the get_data.py file.
def get_data(config):
# If the dataset is not custom, create a dataset folder
if config['dataset'] != 'CUSTOM':
dataset_path = "client_dataset"
if not os.path.exists(dataset_path):
os.makedirs(dataset_path)
# Get the train and test datasets for each supported dataset
if config['dataset'] == 'MNIST':
# Apply transformations to the images
apply_transform = transforms.Compose([transforms.Resize(config["resize_size"]), transforms.ToTensor()])
# Download and load the trainset
trainset = datasets.MNIST(root='client_dataset/MNIST', train=True, download=True, transform=apply_transform)
# Download and load the testset
testset = datasets.MNIST(root='client_dataset/MNIST', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'FashionMNIST':
apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
trainset = datasets.FashionMNIST(root='client_dataset/FashionMNIST', train=True, download=True, transform=apply_transform)
testset = datasets.FashionMNIST(root='client_dataset/FashionMNIST', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CIFAR10':
apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
trainset = datasets.CIFAR10(root='client_dataset/CIFAR10', train=True, download=True, transform=apply_transform)
testset = datasets.CIFAR10(root='client_dataset/CIFAR10', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CIFAR100':
apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
trainset = datasets.CIFAR100(root='client_dataset/CIFAR100', train=True, download=True, transform=apply_transform)
testset = datasets.CIFAR100(root='client_dataset/CIFAR100', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CUSTOM':
apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
# Load the custom dataset
trainset = customDataset(root='client_custom_dataset/CUSTOM/train', transform=apply_transform)
testset = customDataset(root='client_custom_dataset/CUSTOM/test', transform=apply_transform)
else:
# Raise an error if an unsupported dataset is specified
raise ValueError("Unsupported dataset type: {}".format(config['dataset']))
# Return the train and test datasets
return trainset, testset
For example, to add support for the STL10 dataset, the following lines of code can be added to the get_data function:
elif config['dataset'] == 'STL10':
apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
trainset = datasets.STL10(root='client_dataset/STL10', split='train', download=True, transform=apply_transform)
testset = datasets.STL10(root='client_dataset/STL10', split='test', download=True, transform=apply_transform)
Adding support for a custom dataset
In order to incorporate support for a custom dataset, the train and test sets for the dataset must be included in the train and test folders, respectively, within the client/client_custom_dataset/CUSTOM/ directory. The train and test data must be stored in .npy files. The custom_dataset class which loads the custom data has been defined in the get_data.py file and can be changed as per the requirements of the custom dataset. The custom_dataset class is a subclass of torch.utils.data.Dataset and has the same API as the datasets in torchvision.datasets. The following code snippet shows the custom_dataset class:
class customDataset(data.Dataset):
def __init__(self, root, transform=None):
"""
Custom dataset class for loading image and label data from a folder of .npy files.
Args:
root (str): Path to the folder containing the .npy files.
transform (callable, optional): A function/transform that takes in an PIL image and returns a transformed version.
E.g, `transforms.RandomCrop`
"""
self.root = root
samples = sample_return(root)
self.samples = samples
self.transform = transform
def __getitem__(self, index):
"""
Retrieves a sample from the dataset at the given index.
Args:
index (int): Index of the sample to retrieve.
Returns:
img (PIL.Image): The image data.
label (int): The label for the image data.
"""
img, label= self.samples[index]
img = np.load(img)
img = Image.fromarray(img)
if self.transform is not None:
img = self.transform(img)
return img, label
def __len__(self):
return len(self.samples)
Models
The models currently implemented in the framework are:
LeNet-5
ResNet-18
ResNet-50
VGG-16
AlexNet
The server_lib.py file contains the implementation of Deep-Learning models for the server, while the net.py file contains the implementation of these models for the client. These models are either created by inheriting from torch.nn.module or are imported from torchvision.models.
Adding support for a new model
There are two ways to incorporate support for new models in FedERA. One involves creating a new class that inherits from torch.nn.module and the other involves importing a model from torchvision.models. The first method is more flexible and allows for more customization, while the second method is easier to implement and is recommended for beginners.
Adding support for a new model by inheriting from torch.nn.module
To add support for a new model by inheriting from torch.nn.module, the following steps need to be followed:
Create a new class that inherits from torch.nn.module and defines the model that needs to be implemented, and add it to server_lib.py file and the net.py file. The code for LeNet is given below as an example:
class LeNet(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(in_channels, 6, kernel_size=5)
self.pool1 = nn.MaxPool2d(kernel_size=2,stride=2)
self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
self.pool2 = nn.MaxPool2d(kernel_size=2,stride=2)
self.fc1 = nn.Linear(400, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, num_classes)
self.relu = nn.ReLU()
self.logSoftmax = nn.LogSoftmax(dim=1)
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.pool1(x)
x = self.conv2(x)
x = self.relu(x)
x = self.pool2(x)
x = x.view(-1, 400)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.relu(x)
x = self.fc3(x)
x = self.logSoftmax(x)
return x
The models implemented in client_lib.py and net.py files are imported via the use of get_net.py function defined in both the files. This function takes in the name of the model as a string and returns the corresponding model. To add support for a new model, the name of the model needs to be added to the get_net.py function in both the files and appropriate changes need to be made. The code for the get_net.py function in client_lib.py is given below as an example:
def get_net(config):
if config["net"] == 'LeNet':
if config['dataset'] in ['MNIST', 'FashionMNIST', 'CUSTOM']:
net = LeNet(in_channels=1, num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = LeNet(in_channels=3, num_classes=10)
else:
net = LeNet(in_channels=3, num_classes=100)
if config["net"] == 'resnet18':
if config['dataset'] in ['MNIST', 'FashionMNIST']:
net = models.resnet18(num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = models.resnet18(num_classes=10)
else:
net = models.resnet18(num_classes=100)
if config["net"] == 'resnet50':
if config['dataset'] in ['MNIST', 'FashionMNIST']:
net = models.resnet50(num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = models.resnet50(num_classes=10)
else:
net = models.resnet50(num_classes=100)
if config["net"] == 'vgg16':
if config['dataset'] in ['MNIST', 'FashionMNIST']:
net = models.vgg16(num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = models.vgg16(num_classes=10)
else:
net = models.vgg16(num_classes=100)
if config['net'] == 'AlexNet':
if config['dataset'] in ['MNIST', 'FashionMNIST']:
net = models.alexnet(num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = models.alexnet(num_classes=10)
else:
net = models.alexnet(num_classes=100)
return net
Adding support for a new model by importing from torchvision.models
To add support for a new model by importing from torchvision.models, import the model from torchvision.models in server_lib.py and net.py files and make changes in the get_net function appropriately. The code that needs to be added in get_net function to import ResNet38 model is given below as an example:
if config["net"] == 'resnet38':
if config['dataset'] in ['MNIST', 'FashionMNIST']:
net = models.resnet38(num_classes=10)
elif config['dataset'] == 'CIFAR10':
net = models.resnet38(num_classes=10)
else:
net = models.resnet38(num_classes=100)
Data Distribution
FedERA allows the option to train with either IID or non-IID data distribution. To specify the data distribution, you can use the “–iid” flag. When the flag is set to “1”, the data distribution is IID. However, if you set it to a value between “2-5”, the data distribution will be non-IID. Each argument value corresponds to a different non-IID distribution. The non-IID distributions are defined as follows:
def data_distribution(config, trainset):
labels = []
base_dir = os.getcwd()
storepath = os.path.join(base_dir, 'Distribution/', config['dataset']+'/')
seed = 10
random.seed(seed)
num_users = 5
#Calculate the number of samples present per class
for i in range(len(trainset)):
labels.append(trainset[i][1])
unique_labels = np.unique(np.array(labels))
label_index_list = {}
for key in unique_labels:
label_index_list[key] = []
for index, label in enumerate(labels):
label_index_list[label].append(index)
num_classes = len(unique_labels)
#Calculate the value of the probability distribution. For K=1, it will be iid distribution
K = config['niid']
if (K==1):
q_step = (1 - (1/num_classes))
else:
q_step = (1 - (1/num_classes))/(K-1)
#Shuffle the index position for all classes
for i in range(len(label_index_list)):
random.shuffle(label_index_list[i])
#Generate the different non-iid distribution. Data_presence_indicator will help to reduce the number of classes among the clients as the non-iid increases
for j in range(K):
dist = np.random.uniform(q_step, (1+j)*q_step, (num_classes, num_users))
if j != 0:
data_presence_indicator = np.random.choice([0, 1], (num_classes, num_users), p=[j*q_step, 1-(j*q_step)])
if len(np.where(np.sum(data_presence_indicator, axis=0) == 0)[0])>0:
for i in np.where(np.sum(data_presence_indicator, axis=0) == 0)[0]:
zero_array = data_presence_indicator[:,i]
zero_array[np.random.choice(len(zero_array),1)] =1
data_presence_indicator[:,i] = zero_array
dist = np.multiply(dist,data_presence_indicator)
psum = np.sum(dist, axis=1)
for i in range(dist.shape[0]):
dist[i] = dist[i]*len(label_index_list[i])/(psum[i]+0.00001)
dist = np.floor(dist).astype(int)
# If any client does not get any data then this logic helps to allocate the required samples among the clients
gainers = list(np.where(np.sum(dist, axis=0) != 0))[0]
if len(gainers) < num_users:
losers = list(np.where(np.sum(dist, axis=0) == 0))[0]
donors = np.random.choice(gainers, len(losers))
for index, donor in enumerate(donors):
avail_digits = np.where(dist[:,donor] != 0)[0]
for digit in avail_digits:
transfer_frac = np.random.uniform(0.1,0.9)
num_transfer = int(dist[digit, donor]*transfer_frac)
dist[digit, donor] = dist[digit, donor] - num_transfer
dist[digit, losers[index]] = num_transfer
#Logic to check if the summation of all the samples among the clients is equal to the total number of samples present for that class. If not it will adjust.
for num in range(num_classes):
while dist[num].sum() != len(label_index_list[num]):
index = random.randint(0,num_users-1)
if dist[num].sum() < len(label_index_list[num]):
dist[num][index]+=1
else:
dist[num][index]-=1
#Division of samples number among the clients
split = [[] for i in range(num_classes)]
for num in range(num_classes):
start = 0
for i in range(num_users):
split[num].append(label_index_list[num][start:start+dist[num][i]])
start = start+dist[num][i]
#Division of actual data points among the clients.
datapoints = [[] for i in range(num_users)]
class_histogram = [[] for i in range(num_users)]
class_stats= [[] for i in range(num_users)]
for i in range(num_users):
for num in range(num_classes):
datapoints[i] += split[num][i]
class_histogram[i].append(len(split[num][i]))
if(len(split[num][i])==0):
class_stats[i].append(0)
else:
class_stats[i].append(1)
#Store the dataset division in the folder
if not os.path.exists(storepath):
os.makedirs(storepath)
file_name = 'data_split_niid_'+ str(K)+'.pt'
torch.save({'datapoints': datapoints, 'histograms': class_histogram, 'class_statitics': class_stats}, storepath + file_name)
Visualizing the non-IID data distribution for MNIST dataset
Classwise distribution of samples among the clients for different non-IID distribution

Classwise distribution of samples among the clients for non-IID distribution 1

Classwise distribution of samples among the clients for non-IID distribution 2

Classwise distribution of samples among the clients for non-IID distribution 3

C lasswise distribution of samples among the clients for non-IID distribution 4

Classwise distribution of samples among the clients for non-IID distribution 5
Samplewise distribution of samples among the clients for different non-IID distribution

Samplewise distribution of samples among the clients for non-IID distribution 1

Samplewise distribution of samples among the clients for non-IID distribution 2

Samplewise distribution of samples among the clients for non-IID distribution 3

Samplewise distribution of samples among the clients for non-IID distribution 4

Samplewise distribution of samples among the clients for non-IID distribution 5
Carbon Emissions
CodeCarbon is a Python package that provides an estimation of the carbon emissions associated with software code. It can be integrated into software development workflows and offers real-time feedback on the environmental impact of the code being developed. CodeCarbon helps developers and organizations become more environmentally conscious by optimizing their code and making better choices regarding the hardware and infrastructure used to run it. It enables companies to achieve their sustainability goals and demonstrate their commitment to reducing their environmental impact.
To estimate the carbon emissions generated by clients during training, CodeCarbon has been utilized in the client_lib.py file, located in the client/src/ directory. By default, the client’s location is set to India, but it can be modified to reflect the client’s actual location. The following code snippet illustrates how CodeCarbon is used in the client_lib.py file:
#Run code carbon if the carbon-tracker flag is True
if (carbon_tracker==1):
tracker = OfflineEmissionsTracker(country_iso_code="IND", output_dir = save_dir_path)
tracker.start()
trainloader, testloader, _ = load_data(config_dict)
print("training started")
if (config_dict['algorithm'] == 'mimelite'):
model, control_variate = train_mimelite(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'scaffold'):
model, control_variate = train_scaffold(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'mime'):
model, control_variate = train_mime(model, control_variate, control_variate2, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'fedavg'):
model = train_fedavg(model, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'feddyn'):
model = train_feddyn(model, trainloader, epochs, deadline)
else:
model = train_model(model, trainloader, epochs, deadline)
print("training finished")
if (carbon_tracker==1):
emissions: float = tracker.stop()
print(f"Emissions: {emissions} kg")
Encryption
In the FedERA framework, encryption plays a crucial role in ensuring secure communication between the client and server during the Federated Learning process. This section provides guidance on generating and configuring the necessary certificates for TLS/SSL encryption.
TLS Basics
To understand the encryption process, it’s essential to grasp the fundamentals of TLS/SSL and chains of trust. TLS/SSL operates based on a transitive trust model, where trust in a certificate authority (CA) extends to the certificates it generates. Web browsers and operating systems have a “Trusted Roots” certificate store, automatically trusting certificates from public certificate authorities such as Let’s Encrypt or GoDaddy.
In the case of FedERA, we establish our own CA and need to inform the client about the CA certificate for trust verification. Additionally, the server certificate must contain the exact server name the client connects to for validation.
Generate Certificates
For the purpose of this example, we will set up a basic PKI Infrastructure using CloudFlare’s CFSSL toolset, specifically the cfssl and cfssljson tools. You can download these tools from here .
The ssl directory contains configuration files that can be modified, but for demonstration purposes, they can also be used as-is.
Generate CA Certificate and Config
To generate the CA certificate and configuration, navigate to the ssl directory and run the following command:
$ cd FedERA/ssl
$ cfssl gencert -initca ca-csr.json | cfssljson -bare ca
This command generates the ca.pem and ca-key.pem files. The ca.pem file is used by both the client and server for mutual verification.
Generate Server and Client Certificates
Server Certificate
To generate the server certificate and key pair, run the following command in the ssl directory:
$ cd FedERA/ssl
$ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -hostname='127.0.0.1,localhost' server-csr.json | cfssljson -bare server
This command creates the server certificate and key pair to be used by the server during TLS/SSL encryption. Note that you can modify the hostname parameter to match the name or IP address of the server on your network.
Client Certificate
To generate the client certificate and key pair, use the following command in the ssl directory:
$ cd FedERA/ssl
$ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json client-csr.json | cfssljson -bare client
When generating the client certificate and key pair, a warning message may appear regarding the absence of a “hosts” field. This warning is expected and acceptable since the client certificate is only used for client identification, not server identification.
TLS Server Identification and Encryption
In FedERA, the client trusts the certificate authority certificate, which subsequently enables trust in the server certificate. This is similar to how web browsers handle certificates, where pre-installed public certificate authority certificates establish trust.
For one-way trust verification (client verifies server identity but not vice versa), the server does not necessarily need to present the CA certificate as part of its certificate chain. The server only needs to present enough of the certificate chain for the client to trace it back to a trusted CA certificate.
In the FedERA framework, the gRPC server can be configured for SSL using the following code snippet:
On server side
if configurations['encryption']==1:
# Load the server's private key and certificate
keyfile = configurations['server_key']
certfile = configurations['server_cert']
private_key = bytes(open(keyfile).read(), 'utf-8')
certificate_chain = bytes(open(certfile).read(), 'utf-8')
# Create SSL/TLS credentials object
server_credentials = ssl_server_credentials([(private_key, certificate_chain)])
server.add_secure_port('localhost:8214', server_credentials)
On client side
if config["encryption"] == 1:
ca_cert = 'ca.pem'
root_certs = bytes(open(ca_cert).read(), 'utf-8')
credentials = grpc.ssl_channel_credentials(root_certs)
#create new gRPC channel to the server
channel = grpc.secure_channel(ip_address, options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
], credentials=credentials)
Acknowledgments
This code and information were developed with the help of the repository jottoekke/python-grpc-ssl, which provided valuable guidance in implementing the encryption functionality.
Contribution to FedERA
Reporting bugs
To report bugs or request features, we utilize GitHub issues. If you come across a bug or have an idea for a feature, don’t hesitate to open an issue.
If you encounter any problems while using this software package, please submit a ticket to the Bug Tracker. Additionally, you can post pull requests or feature requests.
Contributing to FedERA
If you wish to contribute to the project by submitting code, you can do so by creating a Pull Request. By contributing code, you agree that your contributions will be licensed under Apache License, Version 2.0.
We encourage you to contribute to the enhancement of FedERA or the implementation of existing FL methods within FedERA. The recommended method for contributing to FedERA is to fork the main repository on GitHub, clone it, and develop on a branch. Follow these steps:
Click on “Fork” to fork the project repository.
Clone your forked repository from your GitHub account to your local machine:
$ git clone https://github.com/anupamkliv/FedERA.git
and then navigate to the FedLab directory using the command
$ cd FedERA
Create a new branch to save your changes using the command
$ git checkout -b my-feature
Develop the feature on your branch and use the command
$ git add modified_files
followed by
$ git commit
to save your changes.
Pull Request Checklist
Please follow the file structure below for new features or create new file if there are something new.
Reference
API Reference
This page contains auto-generated API reference documentation [1].
federa
client
src
ClientConnection_pb2
Generated protocol buffer code.
Module Contents
- _sym_db
- DESCRIPTOR
- _SERVERMESSAGE
- _CLIENTMESSAGE
- _TRAINORDER
- _TRAINRESPONSE
- _EVALORDER
- _EVALRESPONSE
- _SETPARAMSORDER
- _SETPARAMSRESPONSE
- _DISCONNECTORDER
- ServerMessage
- ClientMessage
- TrainOrder
- TrainResponse
- EvalOrder
- EvalResponse
- SetParamsOrder
- SetParamsResponse
- DisconnectOrder
- _CLIENTCONNECTION
ClientConnection_pb2_grpc
Client and server classes corresponding to protobuf-defined services.
Module Contents
Missing associated documentation comment in .proto file. |
|
Missing associated documentation comment in .proto file. |
|
Missing associated documentation comment in .proto file. |
|
- class ClientConnectionStub(channel)
Missing associated documentation comment in .proto file.
- class ClientConnectionServicer
Missing associated documentation comment in .proto file.
- Connect(request_iterator, context)
Missing associated documentation comment in .proto file.
- add_ClientConnectionServicer_to_server(servicer, server)
client
Module Contents
|
- client_start(config)
client_lib
Module Contents
|
|
|
|
|
|
|
|
- fl_timestamp
- save_dir_path
- evaluate(eval_order_message, device)
- train(train_order_message, device)
- set_parameters(set_parameters_order_message, device)
- save_model_state(model)
- plot_emission()
data_utils
Module Contents
- class distributionDataloader(config, trainset, data_path, clientID=0, aug=False)
Bases:
torch.utils.data.Dataset
- __len__()
- __getitem__(index)
distribution
Module Contents
|
- data_distribution(config, trainset)
get_data
Module Contents
|
|
|
- get_data(config)
- class customDataset(root, transform=None)
Bases:
torch.utils.data.Dataset
- __getitem__(index)
Retrieves a sample from the dataset at the given index. Args:
index (int): Index of the sample to retrieve.
- Returns:
img (PIL.Image): The image data. label (int): The label for the image data.
- __len__()
- sample_return(root)
net
Module Contents
|
- class LeNet(in_channels=1, num_classes=10)
Bases:
torch.nn.Module
- forward(x)
- get_net(config)
net_lib
Module Contents
|
|
|
Trains a neural network model on a given dataset using SGD optimizer with Cross Entropy Loss criterion. |
|
Trains a given neural network using the Federated Averaging (FedAvg) algorithm. |
|
Trains a given neural network using the FedDyn algorithm. |
|
Trains a given neural network using the MimeLite algorithm. |
|
Trains a given neural network using the Mime algorithm. |
|
Trains a given neural network using the Scaffold algorithm. |
|
Evaluate the performance of a model on a test dataset. |
- load_data(config)
- flush_memory()
- train_model(net, trainloader, epochs, device, deadline=None)
Trains a neural network model on a given dataset using SGD optimizer with Cross Entropy Loss criterion. Args:
net: neural network model trainloader: PyTorch DataLoader object for training dataset epochs: number of epochs to train the model deadline: optional deadline time for training
- Returns:
trained model with the difference between trained model and the received model
- train_fedavg(net, trainloader, epochs, device, deadline=None)
Trains a given neural network using the Federated Averaging (FedAvg) algorithm.
Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process
Returns: A trained PyTorch neural network model
- train_feddyn(net, trainloader, epochs, device, deadline=None)
Trains a given neural network using the FedDyn algorithm. Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process
Returns: A trained PyTorch neural network model
- train_mimelite(net, state, trainloader, epochs, device, deadline=None)
Trains a given neural network using the MimeLite algorithm.
Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process
Returns: A trained PyTorch neural network model
In the case of MimeLite, control_variate is nothing but a state like in case of momentum method
- train_mime(net, state, control_variate, trainloader, epochs, device, deadline=None)
Trains a given neural network using the Mime algorithm.
Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process
Returns: A trained PyTorch neural network model
- train_scaffold(net, server_c, trainloader, epochs, device, deadline=None)
Trains a given neural network using the Scaffold algorithm.
Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process
Returns: A trained PyTorch neural network model
- test_model(net, testloader, device)
Evaluate the performance of a model on a test dataset.
Args: net (torch.nn.Module): The neural network model to evaluate. testloader (torch.utils.data.DataLoader): The data loader for the test dataset.
Returns: Tuple: The average loss and accuracy of the model on the test dataset.
server
src
server_evaluate
eval_lib
Module Contents
|
- server_eval(model_state_dict, config)
Package Contents
|
- server_eval(model_state_dict, config)
ClientConnection_pb2
Generated protocol buffer code.
Module Contents
- _sym_db
- DESCRIPTOR
- _SERVERMESSAGE
- _CLIENTMESSAGE
- _TRAINORDER
- _TRAINRESPONSE
- _EVALORDER
- _EVALRESPONSE
- _SETPARAMSORDER
- _SETPARAMSRESPONSE
- _DISCONNECTORDER
- ServerMessage
- ClientMessage
- TrainOrder
- TrainResponse
- EvalOrder
- EvalResponse
- SetParamsOrder
- SetParamsResponse
- DisconnectOrder
- _CLIENTCONNECTION
ClientConnection_pb2_grpc
Client and server classes corresponding to protobuf-defined services.
Module Contents
Missing associated documentation comment in .proto file. |
|
Missing associated documentation comment in .proto file. |
|
Missing associated documentation comment in .proto file. |
|
- class ClientConnectionStub(channel)
Missing associated documentation comment in .proto file.
- class ClientConnectionServicer
Missing associated documentation comment in .proto file.
- Connect(request_iterator, context)
Missing associated documentation comment in .proto file.
- add_ClientConnectionServicer_to_server(servicer, server)
client_connection_servicer
Module Contents
- class ClientConnectionServicer(client_manager)
Bases:
federa.server.src.ClientConnection_pb2_grpc.ClientConnectionServicer
- Connect(request_iterator, context)
client_manager
Module Contents
client_wrapper
Module Contents
- class ClientWrapper(send_buffer, recieve_buffer, client_id)
- train(model_parameters, control_variate, control_variate2, config_dict)
- evaluate(model_parameters, config_dict)
- set_parameters(model_parameters)
- check_disconnection()
- is_disconnected()
- disconnect(reconnect_time=0, message='Thank you for participating.')
server
Module Contents
|
|
|
- server_runner(client_manager, configurations)
- server_start(configurations)
server_lib
Module Contents
|
|
|
|
|
|
|
|
|
|
|
|
|
- device
- load_data(config)
- get_data(config)
- class customDataset(root, transform=None)
Bases:
torch.utils.data.Dataset
- __getitem__(index)
- __len__()
- sample_return(root)
- class LeNet(in_channels=1, num_classes=10)
Bases:
torch.nn.Module
- forward(x)
- get_net(config)
- train_model(net, trainloader)
- test_model(net, testloader)
- save_intial_model(config)
verification
Module Contents
|
|
|
- verify(clients, trained_model_state_dicts, save_dir_path, threshold=0)
- random_derangement(list_to_shuffle)
tests
minitest
Module Contents
Test case for verification module |
|
Test case for timeout module |
|
Test case for intermediate client connections module |
|
Test case for FedAvg |
|
Test case for FedAdam |
Verify the FedAvg aggregation algorithm using two clients |
|
Verify the FedAdam aggregation algorithm using two clients |
|
Verify the verification module using two clients by implementing the following function. |
|
Verify the timeout module using two clients by implementing the following function. |
|
Verify the itermeidate connection module using two clients |
- create_train_test_for_fedavg()
Verify the FedAvg aggregation algorithm using two clients by implementing the following function.
- create_train_test_for_fedadam()
Verify the FedAdam aggregation algorithm using two clients by implementing the following function.
- create_train_test_for_verification_module()
Verify the verification module using two clients by implementing the following function.
- create_train_test_for_timeout_module()
Verify the timeout module using two clients by implementing the following function.
- create_train_test_for_intermediate_connection_module()
Verify the itermeidate connection module using two clients by implementing the following function.
- class TestTrainer_verification(methodName='runTest')
Bases:
create_train_test_for_verification_module
()Test case for verification module
- class TestTrainer_timeout(methodName='runTest')
Bases:
create_train_test_for_timeout_module
()Test case for timeout module
- class TestTrainer_intermediate(methodName='runTest')
Bases:
create_train_test_for_intermediate_connection_module
()Test case for intermediate client connections module
- class TestTrainer_fedavg(methodName='runTest')
Bases:
create_train_test_for_fedavg
()Test case for FedAvg
- class TestTrainer_fedadam(methodName='runTest')
Bases:
create_train_test_for_fedadam
()Test case for FedAdam
misc
Module Contents
|
Get the configuration file as json from it |
|
Return the tester to each test algorithm. |
- get_config(action, action2, config_path='')
Get the configuration file as json from it
- tester(configs, no_of_clients, late=None)
Return the tester to each test algorithm. Late is introduced for intermediate connection
Package Contents
- __version__ = '0.0.0'
scaffold
Module Contents
fedadam
Module Contents
fedavgm
Module Contents
fedadagrad
Module Contents
fedavg
Module Contents
mimelite
Module Contents
mime
Module Contents
feddyn
Module Contents
fedyogi
Module Contents
Contacts
Contact the FedERA development team through Github issues or email:
Development Team: federa.team@gmail.com