FedERA

FedERA is a highly dynamic and customizable framework that can accommodate many use cases with flexibility by implementing several functionalities over different federated learning algorithms, and essentially creating a plug-and-play architecture to accommodate different use cases.

Check out the Overview of FedERA section for further information, including how to Installation the project.

Note

This project is under active development.

Contents

Overview of FedERA

Introduction

Federated Learning is a machine learning technique for training models on distributed data without sharing it. In traditional machine learning, large datasets must first be collected and then sent to one location where they can be combined before the model is trained on them. However, this process can cause privacy concerns as sensitive personal data may become publicly available. Federated learning attempts to address these concerns by keeping individual user’s data local while still allowing for powerful powerful statistical analysis that can be used to create accurate models at scale.

FedAvg is one of the foundational blocks of federated learning. A single communication round of FedAvg includes:

  • Waiting for a number of clients to connect to a server (Step 0)

  • Sending the clients a global model (Step 1)

  • Train the model with locally available data (Step 2)

  • Send the trained models back to the server (Step 3)

The server then averages the weights of the models and calculates a new aggregated model. This process constitutes a single communication round and several such communication rounds occur to train a model.

_images/fedavg_steps.png

Overview

FedERA is a highly dynamic and customizable framework that can accommodate many use cases with flexibility by implementing several functionalities over vanilla FedAvg, and essentially creating a plug-and-play architecture to accommodate different use cases.

Federated Learning
_images/phase1.png


Establishing Connection between Server and Clients
_images/connection.png


Communication with clients
_images/communication.png


Fractional and random subsampling
  • The Client_Manager can be used to sample the already connected clients.

  • A minimum number of clients can be provided, upon which the Client_Manager will wait for that many clients to connect before returning a reference to them.

    • If a fraction is provided, the Client_Manager will return that fraction of available clients.

    • The Client_Manager can sample the clients based on their connection order or a random order. A function can also be provided to determine the selection of clients.

Various modules in Feder

Feder is composed of 4 modules, each module building upon the last.

  1. Verification module. Before aggregating, the server will perform a special verification round to determine which models to accommodate during aggregation.

  2. Timeout module. Instead of waiting indefinitely for a client to finish training, the server will be able to issue a timeout, upon the completion of which, even if it hasn’t completed all epochs, the client will stop training and return the results.

  3. Intermediate client connections module. New clients will be able to join the server anytime and may even be included in a round that is already live.

  4. Carbon emissions tracking module. The framework will be able to track the carbon emissions of the clients during the training process.

Verification module
  • After the server receives the trained weights, it aggregates all of them to form the new model. However, the selection of models for aggregation can be modified.

  • Before aggregation, the server passes the models to a Verification module, which then uses a predefined procedure to generate scores for models, and then returns only those models that have performed above a defined threshold.

  • The Verification module can be easily customized.

Steps in the Verification module
_images/verification_steps.png


Modified Federated Learning architecture
_images/verification_1.png _images/verification_2.png


Timeout module
  • Often in real world scenarios, clients cannot keep training indefinitely. Therefore, a timeout functionality has been implemented.

  • The server can specify a timeout parameter as a Train order configuration. The client will then train till the timeout occurs, and then return the results.

Steps in the Timeout module
_images/timeout.png

Intermediate client connections module
  • Now, even during the middle of a communication round, the server can accept new client connections, incorporate them into the Client_Manager and even include them in the ongoing communication round as well.

  • The server can be easily configured to allow or reject new connections during different parts of Federated Learning.

  • Safeguards to notify when a client has disconnected anytime have been implemented.

Carbon emissions tracking module

In FedERA CodeCarbon package is used to estimate the carbon emissions generated by clients during training. CodeCarbon is a Python package that provides an estimation of the carbon emissions associated with software code.

Tested on

FedERA has been extensively tested on and works with the following devices:

  • Intel CPUs

  • Nvidia GPUs

  • Nvidia Jetson

  • Raspberry Pi

  • Intel NUC

With FedERA, it is possible to operate the server and clients on separate devices or on a single device through various means, such as utilizing different terminals or implementing multiprocessing.

_images/tested.png

Installation

Install the package

Follow this procedure to prepare the environment and install FedERA:

  1. Install a Python 3.8 (>=3.6, <=3.9) virtual environment using venv.

See the Venv installation guide for details.

  1. Create a new environment for the project.
    1. Using Virtual Environment
      $ python3 -m venv env
      
    2. Using conda
      $ conda create -n env python=3.9
      
  2. Activate the virtual environment.

    1. Virtual Environment
      $ source env/bin/activate
      
    2. Conda Environment
      $ conda activate env
      
  3. Install the FedERA package.

    1. Install the stable version with pip:

      $ pip install feder==$version$
      
    2. Install the latest version from GitHub:

      1. Clone the FedERA repository:

        $ git clone https://github.com/anupamkliv/FedERA.git
        $ cd FedERA
        
      2. Install dependencies:

        $ pip install -r requirements.txt
        

FedERA with Docker

Follow this procedure to build a Docker image of FedERA:

Note

The purpose of the Docker edition of FedERA is to provide an isolated environment complete with the prerequisites to run. Once the execution is finished, the container can be eliminated, and the computation results will be accessible in a directory on the local host.

  1. Install Docker on all nodes in the federation.

See the Docker installation guide for details.

  1. Check that Docker is running properly with the Hello World command:

    $ docker run hello-world
    Hello from Docker!
    This message shows that your installation appears to be working correctly.
    ...
    ...
    ...
    
  2. Build the Docker image of FedERA:

    $ docker build -t federa .
    
  3. Run the Docker image of FedERA:

    $ docker run federa
    

Tutorials

FedERA allows you to do federated learning in real time on various supported edge devices, including Intel CPUs, Nvidia GPUs, Nvidia Jetson, Raspberry Pi, Intel NUC. FedERA provides modular tools and standard algorithms to simplify federated learning implementation in real time using gRPC framework.

Running Server and Client

Step-by-step guide on running server and clients on same and different devices.

How to Customize Federated Learning Algorithm?

Step-by-step guide on how to customize federated learning algorithm.

How to add Custom Dataset?

Step-by-step guide on how to add a custom dataset.

How to add Custom Model?

Step-by-step guide on how to add a custom model.

How to add use different Data Distribution?

Step-by-step guide on how to use different data dstributions while training.

How to get Carbon Footprint?

Step-by-step guide on how to get carbon footprint of the training process.

How to use Encryption?

Step-by-step guide on how to use encryption in the training process.

Running the Server and Clients

Starting the Server

The server is started by running the following command in the root directory of the framework:

python -m federa.server.start_server
Arguments that can be passed to the server are:
Server Configuration Options

Argument

Description

Default

algorithm

specifies the aggregation algorithm

fedavg

clients

specifies number of clients selected per round

1

fraction

specifies fraction of clients selected

1

rounds

specifies total number of rounds

1

model_path

specifies initial server model path

initial_model.pt

epochs

specifies client epochs per round

1

accept_conn

determines if connections accepted after FL begins

1

verify

specifies if verification module runs before rounds

0

threshold

specifies minimum verification score

0

timeout

specifies client training time limit per round

None

resize_size

specifies dataset resize dimension

32

batch_size

specifies dataset batch size

32

net

specifies network architecture

LeNet

dataset

specifies dataset name

MNIST

niid

specifies data distribution among clients

1

carbon

specifies if carbon emissions tracked at client side

0

encryption

specifies whether to use SSL encryption or not

0

server_key

specifies path to server key certificate

server-key.pem

server_cert

specifies path to server certificate

server.pem

Starting the Clients

The clients are started by running the following command in the root directory of the framework:

python federa.client.start_client
Arguments that can be passed to the clients are:
Client Configuration Options

Argument

Description

Default

server_ip

specifies server IP address

localhost:8214

device

specifies device

cpu

encryption

specifies whether to use SSL encryption or not

0

ca

specifies path to CA certificate

ca.pem

wait_time

specifies time to wait before reconnecting to the server

30

Federated Learning Algorithms

The implementation of federated learning algorithms in Feder consists of two components: the training part on the client side and the aggregation part on the server side. The training functions are coded in the net_lib.py file at client/src directory, while the aggregation functions are located in various files within the algorithms folder at server/src directory.

The algorithms currently implemented in FedERA are:

  • FedAvg

  • FedDyn

  • FedAdam

  • FedAdagrad

  • Scaffold

  • FedAvgM

  • Mime

  • Mimelite

  • FedYogi

Adding a new algorithm to FedERA

To add a new algorithm to FedERA, you need to implement the training function on the client side and the aggregation function on the server side. The training function should be implemented in the net_lib.py file at client/src directory. The aggregation function should be implemented in a new file in the algorithms folder at server/src directory.

Implementing the training function

The training function should be implemented in the net_lib file, in a fashion similar to the following example of the mimelite algorithm:

def train_mimelite(net, state, trainloader, epochs, deadline=None):
#In the case of MimeLite, control_variate is nothing but a state like in case of momentum method
x = deepcopy(net)

criterion = torch.nn.CrossEntropyLoss()
lr = 0.001
momentum = 0.9
net.train()

for _ in tqdm(range(epochs)):
    for images, labels in trainloader:
        images, labels = images.to(DEVICE), labels.to(DEVICE)
        loss = criterion(net(images), labels)

        #Compute (full-batch) gradient of loss with respect to net's parameters
        grads = torch.autograd.grad(loss,net.parameters())
        #Update net's parameters using gradients
        with torch.no_grad():
            for param,grad,s in zip(net.parameters(), grads, state):
                param.data = param.data - lr * ((1-momentum) * grad.data + momentum * s.data)

    if deadline:
        current_time = time.time()
        if current_time >= deadline:
            print("deadline occurred.")
            break

#Compute gradient wrt the received model (x) using the wholde dataset
data = DataLoader(trainloader.dataset, batch_size = len(trainloader) * trainloader.batch_size, shuffle = True)
for images, labels in data:
    images, labels = images.to(DEVICE), labels.to(DEVICE)
    output = x(images)
    loss = criterion(output, labels) #Calculate the loss with respect to y's output and labels
    gradient_x = torch.autograd.grad(loss,x.parameters())

return net, gradient_x

After making the changes in the net_lib.py file, the client_lib.py file also needs to be updated so as to incorporate the newly defined algorithm. The client_lib.py file is located at client/src directory. The following code snippet shows the train function that needs to be updated in the client_lib.py file:

def train(train_order_message):
    data_bytes = train_order_message.modelParameters
    data = torch.load( BytesIO(data_bytes), map_location="cpu" )
    model_parameters, control_variate, control_variate2 = data['model_parameters'], data['control_variate'], data['control_variate2']

    config_dict_bytes = train_order_message.configDict
    config_dict = json.loads( config_dict_bytes.decode("utf-8") )
    carbon_tracker = config_dict["carbon_tracker"]

    model = get_net(config= config_dict)
    model.load_state_dict(model_parameters)
    model = model.to(device)
    epochs = config_dict["epochs"]
    if config_dict["timeout"]:
        deadline = time.time() + config_dict["timeout"]
    else:
        deadline = None

    #Run code carbon if the carbon-tracker flag is True
    if (carbon_tracker==1):
        tracker = OfflineEmissionsTracker(country_iso_code="IND", output_dir = save_dir_path)
        tracker.start()

    trainloader, testloader, _ = load_data(config_dict)
    print("training started")
    if (config_dict['algorithm'] == 'mimelite'):
        model, control_variate = train_mimelite(model, control_variate, trainloader, epochs, deadline)
    elif (config_dict['algorithm'] == 'scaffold'):
        model, control_variate = train_scaffold(model, control_variate, trainloader, epochs, deadline)
    elif (config_dict['algorithm'] == 'mime'):
        model, control_variate = train_mime(model, control_variate, control_variate2, trainloader, epochs, deadline)
    elif (config_dict['algorithm'] == 'fedavg'):
        model = train_fedavg(model, trainloader, epochs, deadline)
    elif (config_dict['algorithm'] == 'feddyn'):
        model = train_feddyn(model, trainloader, epochs, deadline)
    else:
        model = train_model(model, trainloader, epochs, deadline)
    print("training finished")

    if (carbon_tracker==1):
        emissions: float = tracker.stop()
        print(f"Emissions: {emissions} kg")

    myJSON = json.dumps(config_dict)
    json_path = save_dir_path + "/config.json"
    with open(json_path, "w") as jsonfile:
        jsonfile.write(myJSON)
    json_path = "config.json"
    with open(json_path, "w") as jsonfile:
        jsonfile.write(myJSON)

    trained_model_parameters = model.state_dict()
    #Create a dictionary where model_parameters and control_variate are stored which needs to be sent to the server
    data_to_send = {}
    data_to_send['model_parameters'] = trained_model_parameters
    data_to_send['control_variate'] = control_variate #If there is no control_variate, this will become None
    buffer = BytesIO()
    torch.save(data_to_send, buffer)
    buffer.seek(0)
    data_to_send_bytes = buffer.read()

    print("train eval")
    train_loss, train_accuracy = test_model(model, testloader)
    response_dict = {"train_loss": train_loss, "train_accuracy": train_accuracy}
    response_dict_bytes = json.dumps(response_dict).encode("utf-8")

    train_response_message = TrainResponse(
        modelParameters = data_to_send_bytes,
        responseDict = response_dict_bytes)

    save_model_state(model)
return train_response_message
Implementing the aggregation function

The aggregation function should be implemented within a class in a new file in the algorithms folder at server/src directory. The following code snippet shows the aggregation function for the mimelite algorithm as deffined in the mimelite.py file:

class mimelite():

    def __init__(self, config):
        self.algorithm = "MimeLite"
        self.lr = 1.0
        self.momentum = 0.9

    def aggregate(self,server_model_state_dict, optimizer_state, state_dicts, gradients_x):

        keys = server_model_state_dict.keys() #List of keys in a state_dict

        avg_y = OrderedDict() #This will be our new server_model_state_dict
        for key in keys:
            current_key_tensors = [state_dict[key] for state_dict in state_dicts]
            current_key_sum = functools.reduce( lambda accumulator, tensor: accumulator + tensor, current_key_tensors )
            current_key_average = current_key_sum / len(state_dicts)
            avg_y[key] = current_key_average

        #Average all the gradient_x in gradients_x
        avg_grads = []
        for i in range(len(gradients_x[0])):
            #Average all the i'th element of gradient_x present in the gradients_x
            current_tensors = [gradient_x[i] for gradient_x in gradients_x]
            current_sum = functools.reduce(lambda accumulator, tensor: accumulator + tensor, current_tensors)
            current_average = current_sum / len(gradients_x)
            avg_grads.append(current_average)

        for state, grad in zip(optimizer_state, avg_grads):
            state.data = self.momentum * state.data + (1 - self.momentum) * grad.data

        return avg_y, optimizer_state

Datasets

The datasets used by FedERA are acquired by fetching them from torchvision.datasets. As of now, feder supports the following datasets:

  • MNIST

  • FashionMNIST

  • CIFAR10

  • CIFAR100

Adding support for new datasets

There are two methods for incorporating support for new datasets in feder. One involves utilizing torchvision.datasets, while the other entails implementing support for a custom dataset.

Adding support for a dataset available in torchvision.datasets

The torchvision.datasets package consists of popular datasets used in computer vision. The datasets are downloaded and cached automatically. The datasets are subclasses of torch.utils.data.Dataset i.e. they have the same API. This makes it easy to incorporate support for new datasets in feder. All that is required is to add a a few lines of code in the get_data function in the get_data.py file.

def get_data(config):
# If the dataset is not custom, create a dataset folder
if config['dataset'] != 'CUSTOM':
    dataset_path = "client_dataset"
    if not os.path.exists(dataset_path):
        os.makedirs(dataset_path)

# Get the train and test datasets for each supported dataset
if config['dataset'] == 'MNIST':
    # Apply transformations to the images
    apply_transform = transforms.Compose([transforms.Resize(config["resize_size"]), transforms.ToTensor()])
    # Download and load the trainset
    trainset = datasets.MNIST(root='client_dataset/MNIST', train=True, download=True, transform=apply_transform)
    # Download and load the testset
    testset = datasets.MNIST(root='client_dataset/MNIST', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'FashionMNIST':
    apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
    trainset = datasets.FashionMNIST(root='client_dataset/FashionMNIST', train=True, download=True, transform=apply_transform)
    testset = datasets.FashionMNIST(root='client_dataset/FashionMNIST', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CIFAR10':
    apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
    trainset = datasets.CIFAR10(root='client_dataset/CIFAR10', train=True, download=True, transform=apply_transform)
    testset = datasets.CIFAR10(root='client_dataset/CIFAR10', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CIFAR100':
    apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
    trainset = datasets.CIFAR100(root='client_dataset/CIFAR100', train=True, download=True, transform=apply_transform)
    testset = datasets.CIFAR100(root='client_dataset/CIFAR100', train=False, download=True, transform=apply_transform)
elif config['dataset'] == 'CUSTOM':
    apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
    # Load the custom dataset
    trainset = customDataset(root='client_custom_dataset/CUSTOM/train', transform=apply_transform)
    testset = customDataset(root='client_custom_dataset/CUSTOM/test', transform=apply_transform)
else:
    # Raise an error if an unsupported dataset is specified
    raise ValueError("Unsupported dataset type: {}".format(config['dataset']))

# Return the train and test datasets
return trainset, testset

For example, to add support for the STL10 dataset, the following lines of code can be added to the get_data function:

elif config['dataset'] == 'STL10':
    apply_transform = transforms.Compose([transforms.Resize(config['resize_size']), transforms.ToTensor()])
    trainset = datasets.STL10(root='client_dataset/STL10', split='train', download=True, transform=apply_transform)
    testset = datasets.STL10(root='client_dataset/STL10', split='test', download=True, transform=apply_transform)
Adding support for a custom dataset

In order to incorporate support for a custom dataset, the train and test sets for the dataset must be included in the train and test folders, respectively, within the client/client_custom_dataset/CUSTOM/ directory. The train and test data must be stored in .npy files. The custom_dataset class which loads the custom data has been defined in the get_data.py file and can be changed as per the requirements of the custom dataset. The custom_dataset class is a subclass of torch.utils.data.Dataset and has the same API as the datasets in torchvision.datasets. The following code snippet shows the custom_dataset class:

class customDataset(data.Dataset):
def __init__(self, root, transform=None):
    """
    Custom dataset class for loading image and label data from a folder of .npy files.
    Args:
        root (str): Path to the folder containing the .npy files.
        transform (callable, optional): A function/transform that takes in an PIL image and returns a transformed version.
                                        E.g, `transforms.RandomCrop`
    """

    self.root = root
    samples = sample_return(root)

    self.samples = samples

    self.transform = transform

def __getitem__(self, index):
    """
    Retrieves a sample from the dataset at the given index.
    Args:
        index (int): Index of the sample to retrieve.
    Returns:
        img (PIL.Image): The image data.
        label (int): The label for the image data.
    """
    img, label= self.samples[index]

    img = np.load(img)

    img = Image.fromarray(img)

    if self.transform is not None:
        img = self.transform(img)


    return img, label

def __len__(self):
    return len(self.samples)

Models

The models currently implemented in the framework are:

  • LeNet-5

  • ResNet-18

  • ResNet-50

  • VGG-16

  • AlexNet

The server_lib.py file contains the implementation of Deep-Learning models for the server, while the net.py file contains the implementation of these models for the client. These models are either created by inheriting from torch.nn.module or are imported from torchvision.models.

Adding support for a new model

There are two ways to incorporate support for new models in FedERA. One involves creating a new class that inherits from torch.nn.module and the other involves importing a model from torchvision.models. The first method is more flexible and allows for more customization, while the second method is easier to implement and is recommended for beginners.

Adding support for a new model by inheriting from torch.nn.module

To add support for a new model by inheriting from torch.nn.module, the following steps need to be followed:

  1. Create a new class that inherits from torch.nn.module and defines the model that needs to be implemented, and add it to server_lib.py file and the net.py file. The code for LeNet is given below as an example:

class LeNet(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
    super(LeNet, self).__init__()
    self.conv1 = nn.Conv2d(in_channels, 6, kernel_size=5)
    self.pool1 = nn.MaxPool2d(kernel_size=2,stride=2)
    self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
    self.pool2 = nn.MaxPool2d(kernel_size=2,stride=2)
    self.fc1 = nn.Linear(400, 120)
    self.fc2 = nn.Linear(120, 84)
    self.fc3 = nn.Linear(84, num_classes)
    self.relu = nn.ReLU()
    self.logSoftmax = nn.LogSoftmax(dim=1)

def forward(self, x):
    x = self.conv1(x)
    x = self.relu(x)
    x = self.pool1(x)
    x = self.conv2(x)
    x = self.relu(x)
    x = self.pool2(x)
    x = x.view(-1, 400)
    x = self.fc1(x)
    x = self.relu(x)
    x = self.fc2(x)
    x = self.relu(x)
    x = self.fc3(x)
    x = self.logSoftmax(x)
    return x
  1. The models implemented in client_lib.py and net.py files are imported via the use of get_net.py function defined in both the files. This function takes in the name of the model as a string and returns the corresponding model. To add support for a new model, the name of the model needs to be added to the get_net.py function in both the files and appropriate changes need to be made. The code for the get_net.py function in client_lib.py is given below as an example:

def get_net(config):
if config["net"] == 'LeNet':
    if config['dataset'] in ['MNIST', 'FashionMNIST', 'CUSTOM']:
        net = LeNet(in_channels=1, num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = LeNet(in_channels=3, num_classes=10)
    else:
        net = LeNet(in_channels=3, num_classes=100)
if config["net"] == 'resnet18':
    if config['dataset'] in ['MNIST', 'FashionMNIST']:
        net = models.resnet18(num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = models.resnet18(num_classes=10)
    else:
        net = models.resnet18(num_classes=100)
if config["net"] == 'resnet50':
    if config['dataset'] in ['MNIST', 'FashionMNIST']:
        net = models.resnet50(num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = models.resnet50(num_classes=10)
    else:
        net = models.resnet50(num_classes=100)
if config["net"] == 'vgg16':
    if config['dataset'] in ['MNIST', 'FashionMNIST']:
        net = models.vgg16(num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = models.vgg16(num_classes=10)
    else:
        net = models.vgg16(num_classes=100)
if config['net'] == 'AlexNet':
    if config['dataset'] in ['MNIST', 'FashionMNIST']:
        net = models.alexnet(num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = models.alexnet(num_classes=10)
    else:
        net = models.alexnet(num_classes=100)
return net
Adding support for a new model by importing from torchvision.models

To add support for a new model by importing from torchvision.models, import the model from torchvision.models in server_lib.py and net.py files and make changes in the get_net function appropriately. The code that needs to be added in get_net function to import ResNet38 model is given below as an example:

if config["net"] == 'resnet38':
    if config['dataset'] in ['MNIST', 'FashionMNIST']:
        net = models.resnet38(num_classes=10)
    elif config['dataset'] == 'CIFAR10':
        net = models.resnet38(num_classes=10)
    else:
        net = models.resnet38(num_classes=100)

Data Distribution

FedERA allows the option to train with either IID or non-IID data distribution. To specify the data distribution, you can use the “–iid” flag. When the flag is set to “1”, the data distribution is IID. However, if you set it to a value between “2-5”, the data distribution will be non-IID. Each argument value corresponds to a different non-IID distribution. The non-IID distributions are defined as follows:

def data_distribution(config, trainset):
    labels = []
    base_dir = os.getcwd()
    storepath = os.path.join(base_dir, 'Distribution/', config['dataset']+'/')
    seed = 10
    random.seed(seed)
    num_users = 5

    #Calculate the number of samples present per class
    for i in range(len(trainset)):
        labels.append(trainset[i][1])
    unique_labels = np.unique(np.array(labels))
    label_index_list = {}
    for key in unique_labels:
        label_index_list[key] = []
    for index, label in enumerate(labels):
        label_index_list[label].append(index)
    num_classes = len(unique_labels)

    #Calculate the value of the probability distribution. For K=1, it will be iid distribution
    K = config['niid']
    if (K==1):
        q_step = (1 - (1/num_classes))
    else:
        q_step = (1 - (1/num_classes))/(K-1)

    #Shuffle the index position for all classes
    for i in range(len(label_index_list)):
        random.shuffle(label_index_list[i])

    #Generate the different non-iid distribution. Data_presence_indicator will help to reduce the number of classes among the clients as the non-iid increases
    for j in range(K):
        dist = np.random.uniform(q_step, (1+j)*q_step, (num_classes, num_users))
        if j != 0:
            data_presence_indicator = np.random.choice([0, 1], (num_classes, num_users), p=[j*q_step, 1-(j*q_step)])
            if len(np.where(np.sum(data_presence_indicator, axis=0) == 0)[0])>0:
                for i in np.where(np.sum(data_presence_indicator, axis=0) == 0)[0]:
                    zero_array = data_presence_indicator[:,i]
                    zero_array[np.random.choice(len(zero_array),1)] =1
                    data_presence_indicator[:,i] = zero_array
            dist = np.multiply(dist,data_presence_indicator)
        psum = np.sum(dist, axis=1)
        for i in range(dist.shape[0]):
            dist[i] = dist[i]*len(label_index_list[i])/(psum[i]+0.00001)
        dist = np.floor(dist).astype(int)

        # If any client does not get any data then this logic helps to allocate the required samples among the clients
        gainers = list(np.where(np.sum(dist, axis=0) != 0))[0]
        if len(gainers) < num_users:
            losers = list(np.where(np.sum(dist, axis=0) == 0))[0]
            donors = np.random.choice(gainers, len(losers))
            for index, donor in enumerate(donors):
                avail_digits = np.where(dist[:,donor] != 0)[0]
                for digit in avail_digits:
                    transfer_frac = np.random.uniform(0.1,0.9)
                    num_transfer = int(dist[digit, donor]*transfer_frac)
                    dist[digit, donor] = dist[digit, donor] - num_transfer
                    dist[digit, losers[index]] = num_transfer

        #Logic to check if the summation of all the samples among the clients is equal to the total number of samples present for that class. If not it will adjust.
        for num in range(num_classes):
            while dist[num].sum() != len(label_index_list[num]):
                index = random.randint(0,num_users-1)
                if dist[num].sum() < len(label_index_list[num]):
                    dist[num][index]+=1
                else:
                    dist[num][index]-=1

        #Division of samples number among the clients
        split = [[] for i in range(num_classes)]
        for num in range(num_classes):
            start = 0
            for i in range(num_users):
                split[num].append(label_index_list[num][start:start+dist[num][i]])
                start = start+dist[num][i]

        #Division of actual data points among the clients.
        datapoints = [[] for i in range(num_users)]
        class_histogram = [[] for i in range(num_users)]
        class_stats= [[] for i in range(num_users)]
        for i in range(num_users):
            for num in range(num_classes):
                datapoints[i] += split[num][i]
                class_histogram[i].append(len(split[num][i]))
                if(len(split[num][i])==0):
                    class_stats[i].append(0)
                else:
                    class_stats[i].append(1)

        #Store the dataset division in the folder
        if not os.path.exists(storepath):
            os.makedirs(storepath)
        file_name = 'data_split_niid_'+ str(K)+'.pt'

        torch.save({'datapoints': datapoints, 'histograms': class_histogram, 'class_statitics': class_stats}, storepath + file_name)
Visualizing the non-IID data distribution for MNIST dataset
Classwise distribution of samples among the clients for different non-IID distribution
_images/class_stats_0.png

Classwise distribution of samples among the clients for non-IID distribution 1


_images/class_stats_1.png

Classwise distribution of samples among the clients for non-IID distribution 2


_images/class_stats_2.png

Classwise distribution of samples among the clients for non-IID distribution 3


_images/class_stats_3.png

C lasswise distribution of samples among the clients for non-IID distribution 4


_images/class_stats_4.png

Classwise distribution of samples among the clients for non-IID distribution 5


Samplewise distribution of samples among the clients for different non-IID distribution
_images/sample_stats_0.png

Samplewise distribution of samples among the clients for non-IID distribution 1


_images/sample_stats_1.png

Samplewise distribution of samples among the clients for non-IID distribution 2


_images/sample_stats_2.png

Samplewise distribution of samples among the clients for non-IID distribution 3


_images/sample_stats_3.png

Samplewise distribution of samples among the clients for non-IID distribution 4


_images/sample_stats_4.png

Samplewise distribution of samples among the clients for non-IID distribution 5

Carbon Emissions

CodeCarbon is a Python package that provides an estimation of the carbon emissions associated with software code. It can be integrated into software development workflows and offers real-time feedback on the environmental impact of the code being developed. CodeCarbon helps developers and organizations become more environmentally conscious by optimizing their code and making better choices regarding the hardware and infrastructure used to run it. It enables companies to achieve their sustainability goals and demonstrate their commitment to reducing their environmental impact.

To estimate the carbon emissions generated by clients during training, CodeCarbon has been utilized in the client_lib.py file, located in the client/src/ directory. By default, the client’s location is set to India, but it can be modified to reflect the client’s actual location. The following code snippet illustrates how CodeCarbon is used in the client_lib.py file:

#Run code carbon if the carbon-tracker flag is True
if (carbon_tracker==1):
        tracker = OfflineEmissionsTracker(country_iso_code="IND", output_dir = save_dir_path)
        tracker.start()

trainloader, testloader, _ = load_data(config_dict)
print("training started")
if (config_dict['algorithm'] == 'mimelite'):
    model, control_variate = train_mimelite(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'scaffold'):
    model, control_variate = train_scaffold(model, control_variate, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'mime'):
    model, control_variate = train_mime(model, control_variate, control_variate2, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'fedavg'):
    model = train_fedavg(model, trainloader, epochs, deadline)
elif (config_dict['algorithm'] == 'feddyn'):
    model = train_feddyn(model, trainloader, epochs, deadline)
else:
    model = train_model(model, trainloader, epochs, deadline)
print("training finished")

if (carbon_tracker==1):
        emissions: float = tracker.stop()
        print(f"Emissions: {emissions} kg")

Encryption

In the FedERA framework, encryption plays a crucial role in ensuring secure communication between the client and server during the Federated Learning process. This section provides guidance on generating and configuring the necessary certificates for TLS/SSL encryption.

TLS Basics

To understand the encryption process, it’s essential to grasp the fundamentals of TLS/SSL and chains of trust. TLS/SSL operates based on a transitive trust model, where trust in a certificate authority (CA) extends to the certificates it generates. Web browsers and operating systems have a “Trusted Roots” certificate store, automatically trusting certificates from public certificate authorities such as Let’s Encrypt or GoDaddy.

In the case of FedERA, we establish our own CA and need to inform the client about the CA certificate for trust verification. Additionally, the server certificate must contain the exact server name the client connects to for validation.

Generate Certificates

For the purpose of this example, we will set up a basic PKI Infrastructure using CloudFlare’s CFSSL toolset, specifically the cfssl and cfssljson tools. You can download these tools from here .

The ssl directory contains configuration files that can be modified, but for demonstration purposes, they can also be used as-is.

Generate CA Certificate and Config

To generate the CA certificate and configuration, navigate to the ssl directory and run the following command:

$ cd FedERA/ssl
$ cfssl gencert -initca ca-csr.json | cfssljson -bare ca

This command generates the ca.pem and ca-key.pem files. The ca.pem file is used by both the client and server for mutual verification.

Generate Server and Client Certificates
Server Certificate

To generate the server certificate and key pair, run the following command in the ssl directory:

$ cd FedERA/ssl
$ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -hostname='127.0.0.1,localhost' server-csr.json | cfssljson -bare server

This command creates the server certificate and key pair to be used by the server during TLS/SSL encryption. Note that you can modify the hostname parameter to match the name or IP address of the server on your network.

Client Certificate

To generate the client certificate and key pair, use the following command in the ssl directory:

$ cd FedERA/ssl
$ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json client-csr.json | cfssljson -bare client

When generating the client certificate and key pair, a warning message may appear regarding the absence of a “hosts” field. This warning is expected and acceptable since the client certificate is only used for client identification, not server identification.

TLS Server Identification and Encryption

In FedERA, the client trusts the certificate authority certificate, which subsequently enables trust in the server certificate. This is similar to how web browsers handle certificates, where pre-installed public certificate authority certificates establish trust.

For one-way trust verification (client verifies server identity but not vice versa), the server does not necessarily need to present the CA certificate as part of its certificate chain. The server only needs to present enough of the certificate chain for the client to trace it back to a trusted CA certificate.

In the FedERA framework, the gRPC server can be configured for SSL using the following code snippet:
On server side
if configurations['encryption']==1:
        # Load the server's private key and certificate
        keyfile = configurations['server_key']
        certfile = configurations['server_cert']
        private_key = bytes(open(keyfile).read(), 'utf-8')
        certificate_chain = bytes(open(certfile).read(), 'utf-8')
        # Create SSL/TLS credentials object
        server_credentials = ssl_server_credentials([(private_key, certificate_chain)])
        server.add_secure_port('localhost:8214', server_credentials)
On client side
if config["encryption"] == 1:
            ca_cert = 'ca.pem'
            root_certs = bytes(open(ca_cert).read(), 'utf-8')
            credentials = grpc.ssl_channel_credentials(root_certs)
            #create new gRPC channel to the server
            channel = grpc.secure_channel(ip_address, options=[
                ('grpc.max_send_message_length', -1),
                ('grpc.max_receive_message_length', -1)
                ], credentials=credentials)
Acknowledgments

This code and information were developed with the help of the repository jottoekke/python-grpc-ssl, which provided valuable guidance in implementing the encryption functionality.

Contribution to FedERA

Reporting bugs

To report bugs or request features, we utilize GitHub issues. If you come across a bug or have an idea for a feature, don’t hesitate to open an issue.

If you encounter any problems while using this software package, please submit a ticket to the Bug Tracker. Additionally, you can post pull requests or feature requests.

Contributing to FedERA

If you wish to contribute to the project by submitting code, you can do so by creating a Pull Request. By contributing code, you agree that your contributions will be licensed under Apache License, Version 2.0.

We encourage you to contribute to the enhancement of FedERA or the implementation of existing FL methods within FedERA. The recommended method for contributing to FedERA is to fork the main repository on GitHub, clone it, and develop on a branch. Follow these steps:

  1. Click on “Fork” to fork the project repository.

  2. Clone your forked repository from your GitHub account to your local machine:

    $ git clone https://github.com/anupamkliv/FedERA.git
    

    and then navigate to the FedLab directory using the command

    $ cd FedERA
    
  3. Create a new branch to save your changes using the command

    $ git checkout -b my-feature
    
  4. Develop the feature on your branch and use the command

    $ git add modified_files
    

    followed by

    $ git commit
    

    to save your changes.

Pull Request Checklist

  • Please follow the file structure below for new features or create new file if there are something new.

Reference

API Reference

This page contains auto-generated API reference documentation [1].

federa

client
src
ClientConnection_pb2

Generated protocol buffer code.

Module Contents
_sym_db
DESCRIPTOR
_SERVERMESSAGE
_CLIENTMESSAGE
_TRAINORDER
_TRAINRESPONSE
_EVALORDER
_EVALRESPONSE
_SETPARAMSORDER
_SETPARAMSRESPONSE
_DISCONNECTORDER
ServerMessage
ClientMessage
TrainOrder
TrainResponse
EvalOrder
EvalResponse
SetParamsOrder
SetParamsResponse
DisconnectOrder
_CLIENTCONNECTION
ClientConnection_pb2_grpc

Client and server classes corresponding to protobuf-defined services.

Module Contents

ClientConnectionStub

Missing associated documentation comment in .proto file.

ClientConnectionServicer

Missing associated documentation comment in .proto file.

ClientConnection

Missing associated documentation comment in .proto file.

add_ClientConnectionServicer_to_server(servicer, server)

class ClientConnectionStub(channel)

Missing associated documentation comment in .proto file.

class ClientConnectionServicer

Missing associated documentation comment in .proto file.

Connect(request_iterator, context)

Missing associated documentation comment in .proto file.

add_ClientConnectionServicer_to_server(servicer, server)
class ClientConnection

Missing associated documentation comment in .proto file.

static Connect(request_iterator, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None)
client
Module Contents

client_start(config)

client_start(config)
client_lib
Module Contents

evaluate(eval_order_message, device)

train(train_order_message, device)

set_parameters(set_parameters_order_message, device)

save_model_state(model)

plot_emission()

fl_timestamp

save_dir_path

fl_timestamp
save_dir_path
evaluate(eval_order_message, device)
train(train_order_message, device)
set_parameters(set_parameters_order_message, device)
save_model_state(model)
plot_emission()
data_utils
Module Contents

distributionDataloader

class distributionDataloader(config, trainset, data_path, clientID=0, aug=False)

Bases: torch.utils.data.Dataset

__len__()
__getitem__(index)
distribution
Module Contents

data_distribution(config, trainset)

data_distribution(config, trainset)
get_data
Module Contents

customDataset

get_data(config)

sample_return(root)

get_data(config)
class customDataset(root, transform=None)

Bases: torch.utils.data.Dataset

__getitem__(index)

Retrieves a sample from the dataset at the given index. Args:

index (int): Index of the sample to retrieve.

Returns:

img (PIL.Image): The image data. label (int): The label for the image data.

__len__()
sample_return(root)
net
Module Contents

LeNet

get_net(config)

class LeNet(in_channels=1, num_classes=10)

Bases: torch.nn.Module

forward(x)
get_net(config)
net_lib
Module Contents

load_data(config)

flush_memory()

train_model(net, trainloader, epochs, device[, deadline])

Trains a neural network model on a given dataset using SGD optimizer with Cross Entropy Loss criterion.

train_fedavg(net, trainloader, epochs, device[, deadline])

Trains a given neural network using the Federated Averaging (FedAvg) algorithm.

train_feddyn(net, trainloader, epochs, device[, deadline])

Trains a given neural network using the FedDyn algorithm.

train_mimelite(net, state, trainloader, epochs, device)

Trains a given neural network using the MimeLite algorithm.

train_mime(net, state, control_variate, trainloader, ...)

Trains a given neural network using the Mime algorithm.

train_scaffold(net, server_c, trainloader, epochs, device)

Trains a given neural network using the Scaffold algorithm.

test_model(net, testloader, device)

Evaluate the performance of a model on a test dataset.

load_data(config)
flush_memory()
train_model(net, trainloader, epochs, device, deadline=None)

Trains a neural network model on a given dataset using SGD optimizer with Cross Entropy Loss criterion. Args:

net: neural network model trainloader: PyTorch DataLoader object for training dataset epochs: number of epochs to train the model deadline: optional deadline time for training

Returns:

trained model with the difference between trained model and the received model

train_fedavg(net, trainloader, epochs, device, deadline=None)

Trains a given neural network using the Federated Averaging (FedAvg) algorithm.

Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process

Returns: A trained PyTorch neural network model

train_feddyn(net, trainloader, epochs, device, deadline=None)

Trains a given neural network using the FedDyn algorithm. Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process

Returns: A trained PyTorch neural network model

train_mimelite(net, state, trainloader, epochs, device, deadline=None)

Trains a given neural network using the MimeLite algorithm.

Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process

Returns: A trained PyTorch neural network model

In the case of MimeLite, control_variate is nothing but a state like in case of momentum method

train_mime(net, state, control_variate, trainloader, epochs, device, deadline=None)

Trains a given neural network using the Mime algorithm.

Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process

Returns: A trained PyTorch neural network model

train_scaffold(net, server_c, trainloader, epochs, device, deadline=None)

Trains a given neural network using the Scaffold algorithm.

Args: net: A PyTorch neural network model trainloader: A PyTorch DataLoader containing the training dataset epochs: An integer specifying the number of training epochs deadline: An optional deadline (in seconds) for the training process

Returns: A trained PyTorch neural network model

test_model(net, testloader, device)

Evaluate the performance of a model on a test dataset.

Args: net (torch.nn.Module): The neural network model to evaluate. testloader (torch.utils.data.DataLoader): The data loader for the test dataset.

Returns: Tuple: The average loss and accuracy of the model on the test dataset.

start_client
Module Contents
parser
args
configs
server
src
server_evaluate
eval_lib
Module Contents

server_eval(model_state_dict, config)

server_eval(model_state_dict, config)
Package Contents

server_eval(model_state_dict, config)

server_eval(model_state_dict, config)
ClientConnection_pb2

Generated protocol buffer code.

Module Contents
_sym_db
DESCRIPTOR
_SERVERMESSAGE
_CLIENTMESSAGE
_TRAINORDER
_TRAINRESPONSE
_EVALORDER
_EVALRESPONSE
_SETPARAMSORDER
_SETPARAMSRESPONSE
_DISCONNECTORDER
ServerMessage
ClientMessage
TrainOrder
TrainResponse
EvalOrder
EvalResponse
SetParamsOrder
SetParamsResponse
DisconnectOrder
_CLIENTCONNECTION
ClientConnection_pb2_grpc

Client and server classes corresponding to protobuf-defined services.

Module Contents

ClientConnectionStub

Missing associated documentation comment in .proto file.

ClientConnectionServicer

Missing associated documentation comment in .proto file.

ClientConnection

Missing associated documentation comment in .proto file.

add_ClientConnectionServicer_to_server(servicer, server)

class ClientConnectionStub(channel)

Missing associated documentation comment in .proto file.

class ClientConnectionServicer

Missing associated documentation comment in .proto file.

Connect(request_iterator, context)

Missing associated documentation comment in .proto file.

add_ClientConnectionServicer_to_server(servicer, server)
class ClientConnection

Missing associated documentation comment in .proto file.

static Connect(request_iterator, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None)
client_connection_servicer
Module Contents

ClientConnectionServicer

class ClientConnectionServicer(client_manager)

Bases: federa.server.src.ClientConnection_pb2_grpc.ClientConnectionServicer

Connect(request_iterator, context)
client_manager
Module Contents

ClientManager

class ClientManager
select(num_of_clients=None, fraction=None, timeout=None)
random_select(num_of_clients=None, fraction=None, timeout=None)
register(client)
num_connected_clients()
deregister(client_index)
wait_for(minimum_clients, timeout)
client_wrapper
Module Contents

ClientWrapper

class ClientWrapper(send_buffer, recieve_buffer, client_id)
train(model_parameters, control_variate, control_variate2, config_dict)
evaluate(model_parameters, config_dict)
set_parameters(model_parameters)
check_disconnection()
is_disconnected()
disconnect(reconnect_time=0, message='Thank you for participating.')
server
Module Contents

server_runner(client_manager, configurations)

server_start(configurations)

server_runner(client_manager, configurations)
server_start(configurations)
server_lib
Module Contents

customDataset

LeNet

load_data(config)

get_data(config)

sample_return(root)

get_net(config)

train_model(net, trainloader)

test_model(net, testloader)

save_intial_model(config)

device

device
load_data(config)
get_data(config)
class customDataset(root, transform=None)

Bases: torch.utils.data.Dataset

__getitem__(index)
__len__()
sample_return(root)
class LeNet(in_channels=1, num_classes=10)

Bases: torch.nn.Module

forward(x)
get_net(config)
train_model(net, trainloader)
test_model(net, testloader)
save_intial_model(config)
verification
Module Contents

verify(clients, trained_model_state_dicts, save_dir_path)

random_derangement(list_to_shuffle)

verify(clients, trained_model_state_dicts, save_dir_path, threshold=0)
random_derangement(list_to_shuffle)
start_server
Module Contents
parser
args
configurations

# start the server with the given parameters

tests
minitest
Module Contents

TestTrainer_verification

Test case for verification module

TestTrainer_timeout

Test case for timeout module

TestTrainer_intermediate

Test case for intermediate client connections module

TestTrainer_fedavg

Test case for FedAvg

TestTrainer_fedadam

Test case for FedAdam

create_train_test_for_fedavg()

Verify the FedAvg aggregation algorithm using two clients

create_train_test_for_fedadam()

Verify the FedAdam aggregation algorithm using two clients

create_train_test_for_verification_module()

Verify the verification module using two clients by implementing the following function.

create_train_test_for_timeout_module()

Verify the timeout module using two clients by implementing the following function.

create_train_test_for_intermediate_connection_module()

Verify the itermeidate connection module using two clients

create_train_test_for_fedavg()

Verify the FedAvg aggregation algorithm using two clients by implementing the following function.

create_train_test_for_fedadam()

Verify the FedAdam aggregation algorithm using two clients by implementing the following function.

create_train_test_for_verification_module()

Verify the verification module using two clients by implementing the following function.

create_train_test_for_timeout_module()

Verify the timeout module using two clients by implementing the following function.

create_train_test_for_intermediate_connection_module()

Verify the itermeidate connection module using two clients by implementing the following function.

class TestTrainer_verification(methodName='runTest')

Bases: create_train_test_for_verification_module()

Test case for verification module

class TestTrainer_timeout(methodName='runTest')

Bases: create_train_test_for_timeout_module()

Test case for timeout module

class TestTrainer_intermediate(methodName='runTest')

Bases: create_train_test_for_intermediate_connection_module()

Test case for intermediate client connections module

class TestTrainer_fedavg(methodName='runTest')

Bases: create_train_test_for_fedavg()

Test case for FedAvg

class TestTrainer_fedadam(methodName='runTest')

Bases: create_train_test_for_fedadam()

Test case for FedAdam

misc
Module Contents

get_config(action, action2[, config_path])

Get the configuration file as json from it

tester(configs, no_of_clients[, late])

Return the tester to each test algorithm.

get_config(action, action2, config_path='')

Get the configuration file as json from it

tester(configs, no_of_clients, late=None)

Return the tester to each test algorithm. Late is introduced for intermediate connection

Package Contents
__version__ = '0.0.0'

scaffold

Module Contents

scaffold

class scaffold(config)
aggregate(server_model_state_dict, control_variate, state_dicts, updated_control_variates)

fedadam

Module Contents

fedadam

class fedadam(config)
aggregate(server_state_dict, state_dicts)

fedavgm

Module Contents

fedavgm

class fedavgm(config)
aggregate(server_state_dict, state_dicts)

fedadagrad

Module Contents

fedadagrad

class fedadagrad(config)
aggregate(server_state_dict, state_dicts)

fedavg

Module Contents

fedavg

class fedavg(config)
aggregate(server_state_dict, state_dicts)

mimelite

Module Contents

mimelite

class mimelite(config)
aggregate(server_model_state_dict, optimizer_state, state_dicts, gradients_x)

mime

Module Contents

mime

class mime(config)
aggregate(server_model_state_dict, optimizer_state, state_dicts, gradients_x)

feddyn

Module Contents

feddyn

class feddyn(config)
aggregate(server_model_state_dict, state_dicts)

fedyogi

Module Contents

fedyogi

class fedyogi(config)
aggregate(server_state_dict, state_dicts)

Contacts

Contact the FedERA development team through Github issues or email: