"""
Test module documentation string for app.py
"""
import abc
import datetime
import json
import numpy as np
import pickle
import sys
import threading
import traceback
from enum import Enum
from time import sleep
from typing import Dict, List, Tuple, Union, TypedDict, Literal
DATA_POLL_INTERVAL = 0.1 # Interval (seconds) to check for new data pieces, adapt if necessary
TERMINAL_WAIT = 10 # Time (seconds) to wait before final shutdown, to allow the controller to pick up the newest
# progress etc.
TRANSITION_WAIT = 1 # Time (seconds) to wait between state transitions
[docs]class Role(Enum):
"""
| Describes the Role of a client
| Can be one of the following values:
| Role.PARTICIPANT
| Role.COORDINATOR
| Role.BOTH
"""
PARTICIPANT = (True, False)
COORDINATOR = (False, True)
BOTH = (True, True)
class State(Enum):
"""
| Describes the current State of the app instance
| Can be one of the following values:
| State.RUNNING
| State.ERROR
| State.ACTION
"""
RUNNING = 'running'
ERROR = 'error'
ACTION = 'action_required'
[docs]class SMPCOperation(Enum):
"""
| It's parameters are used to describe SMPC Operations for other functions
| One of the following:
| SMPCOperation.ADD
| SMPCOperation.MULTIPLY
"""
ADD = 'add'
MULTIPLY = 'multiply'
[docs]class SMPCSerialization(Enum):
"""
| Describes the serialization used with data when using SMPC, so the format data is send between different components of Featurecloud, e.g. between the app instance and the controller
| Currently ony the following is supported
| SMPCSerialization.JSON
"""
JSON = 'json'
[docs]class LogLevel(Enum):
"""
| The level of a log, given to the log function.
| LogLevel.DEBUG: Just for debugging
| LogLevel.ERROR: Throws an error but does not halt the program
| LogLevel.FATAL: Stops the program
"""
DEBUG = 'info'
ERROR = 'error'
FATAL = 'fatal'
class DPNoisetype(Enum):
GAUSS = 'gauss'
LAPLACE = 'laplace'
class DPSerialization(Enum):
JSON = 'json'
class SMPCType(TypedDict):
operation: Literal['add', 'multiply']
serialization: Literal['json']
shards: int
exponent: int
class DPType(TypedDict):
serialization: Literal['json']
noisetype: Literal['gauss', 'laplace']
epsilon: float
delta: float
sensitivity: Union[float, None]
clippingVal: Union[float, None]
class App:
""" Implementing the workflow for the FeatureCloud platform.
Attributes
----------
id: str
coordinator: bool
clients: list
send_counter: int
receive_counter: int
status_available: bool
status_finished: bool
status_message: str
status_progress: float
status_state: str
status_destination: str
status_smpc: SMPCType
status_dp: DPType
status_memo: str
default_smpc: dict
default_dp: dict
data_incoming: dict[str]: [(data, sendingClientID: str),...]
data_outgoing: list[(data, statusJSON: str)]
thread: threading.Thread
states: Dict[str, AppState]
transitions: Dict[str, Tuple[AppState, AppState, bool, bool]]
transition_log: List[Tuple[datetime.datetime, str]]
internal: dict
current_state: AppState
last_send_status: str (JSON)
Methods
-------
handle_setup(client_id, coordinator, clients)
handle_incoming(data)
handle_outgoing()
get_current_status(**kwargs)
guarded_run()
run()
register()
_register_state(name, state, participant, coordinator, **kwargs)
register_transition(name, source, participant, coordinator)
transition()
log()
"""
def __init__(self):
self.id = None
self.coordinator = None
self.coordinatorID = None
self.clients = None
self.default_memo = None
self.thread: Union[threading.Thread, None] = None
self.send_counter: int = 0
self.receive_counter: int = 0
# the send counter is increased with any send_data_to_coordinator
# call while the receive counter is increased with any
# aggregate_data and gather_data call as well as with
# await_data with n=#clients OR n=#clients-1 OR n=1 and smpc=True
# This should work in 99% of all cases, except when
# somebody uses send_data_to_participant from each client and then
# uses await_data on all these #clients -1 data pieces.
self.data_incoming = {}
# dictionary mapping memo: [(data, client),...]
# data is the data serialized with JSON when SMPC or DP is used,
# otherwise serialized the way that the sender used (usually pickle)
# client is the id of the client that sent the data
# memo is the memo send alongside the data to identify to which
# comunication round the data belongs to
self.data_outgoing = []
# list of all data objects and the corresponding status to use with them
# format: list of tuples, each tuple contains dataObject, status as JSON string
self.default_smpc: SMPCType = {'operation': 'add', 'serialization': 'json', 'shards': 0, 'exponent': 8}
self.default_dp: DPType = {'serialization': 'json', 'noisetype': 'laplace',
'epsilon': 1.0, 'delta': 0.0,
'sensitivity': None, 'clippingVal': 10.0}
self.current_state: Union[AppState, None] = None
self.states: Dict[str, AppState] = {}
#self.transitions: Dict[
# str, Tuple[AppState, AppState, bool, bool]] = {} # name => (source, target, participant, coordinator)
self.transitions: Dict[
str, Tuple[AppState, AppState, bool, bool, str]] = {} # name => (source, target, participant, coordinator, label)
self.transition_log: List[Tuple[datetime.datetime, str]] = []
self.internal = {}
self.status_available: bool = False
self.status_finished: bool = False
self.status_message: Union[str, None] = None
self.status_progress: Union[float, None] = None
self.status_state: Union[str, None] = None
self.status_destination: Union[str, None] = None
self.status_smpc: Union[SMPCType, None] = None
self.status_dp: Union[DPType, None] = None
self.status_memo: Union[str, None] = None
self.last_send_status = self.get_current_status()
# Add terminal state
@app_state('terminal', Role.BOTH, self)
class TerminalState(AppState):
def register(self):
pass
def run(self) -> Union[str, None]:
pass
def handle_setup(self, client_id, coordinator, clients, coordinatorID):
""" It will be called on startup and contains information about the
execution context of this instance. And registers all of the states.
Parameters
----------
client_id: str
coordinator: bool
clients: list
coordinatorID: str
"""
self.id = client_id
self.coordinator = coordinator
self.coordinatorID = coordinatorID
self.clients = clients
self.log(f'id: {self.id}')
self.log(f'coordinator: {self.coordinator}')
self.log(f'clients: {self.clients}')
self.current_state = self.states.get('initial')
if not self.current_state:
self.log('initial state not found', level=LogLevel.FATAL)
self.thread = threading.Thread(target=self.guarded_run)
self.thread.start()
def guarded_run(self):
""" run the workflow while trying to catch possible exceptions
"""
try:
self.run()
except Exception as e: # catch all # noqa
self.log(traceback.format_exc())
self.status_message = e.__class__.__name__
self.status_state = State.ERROR.value
self.status_finished = True
self.data_outgoing.insert(0, (None, self.get_current_status(
finished=True, state=State.ERROR.value,
message=e.__class__.__name__)))
# remove ANY data in the pipeline and crash the workflow
# on the next poll
def run(self):
""" Runs the workflow, logs the current state, executes it,
and handles the transition to the next desired state.
Once the app transits to the terminal state, the workflow will be terminated.
"""
while True:
self.log(f'state: {self.current_state.name}')
transition = self.current_state.run()
self.log(f'transition: {transition}')
self.transition(f'{self.current_state.name}_{transition}')
if self.current_state.name == 'terminal':
sleep(TERMINAL_WAIT)
terminal_status_added = False
while True:
if not terminal_status_added:
# add finished status answer to the outgoing data queue
status = self.get_current_status(progress=1.0,
message="terminal",
finished=True)
self.data_outgoing.append((None, status))
# only append to ensure that all data in the pipe is still
# sent out
terminal_status_added = True
sleep(TERMINAL_WAIT)
# potentially this wait time clears the queue already
if len(self.data_outgoing) > 1:
# there is still data to be sent out
self.log(f'done, waiting for the last {len(self.data_outgoing)-1} data pieces to be send')
elif len(self.data_outgoing) == 1:
sleep(TERMINAL_WAIT)
# the last status that was added before is never
# removed, so we finnish when only one status is
# left
# To ensure that this last status has been pulled,
# we just wait
self.log('done')
return
sleep(TRANSITION_WAIT)
sleep(TRANSITION_WAIT)
def register(self):
""" Registers all of the states transitions
it should be called once all of the states are registered.
"""
for s in self.states:
state = self.states[s]
state.register()
def get_current_status(self, **kwargs):
status = dict()
status["available"] = self.status_available
status["finished"] = self.status_finished
status["message"] = self.status_message
status["progress"] = self.status_progress
status["state"] = self.status_state
status["destination"] = self.status_destination
status["smpc"] = self.status_smpc
status["dp"] = self.status_dp
status["memo"] = self.status_memo
for key, value in kwargs.items():
# set whatever is wanted from the arguments
status[key] = value
return status
def handle_incoming(self, data, client, memo):
""" When new data arrives, it appends it to the
`data_incoming` attribute to be accessible for app states.
Parameters
----------
data: list
encoded data
client: str
Id of the client that Sent the data
"""
if memo not in self.data_incoming:
self.data_incoming[memo] = [(data, client)]
else:
self.data_incoming[memo].append((data, client))
def handle_status(self):
""" This informs if there is any data to be sent as well as the way
data should be send
"""
self.status_message = self.status_message if self.status_message else (self.current_state.name if self.current_state else None)
# ensure that some message is set
if len(self.data_outgoing) == 0:
# no data to send, just return the default status with available=false
return self.get_current_status(available=False)
# else take the status from the data to be sent out next. The whole
# data, status combination gets popped in the next handle_outgoing
# function call by the next GET request from the controller, so here
# the status and data itself must still be kept in the list
_, status = self.data_outgoing[0]
self.last_send_status = status
return status
def handle_outgoing(self):
""" When it is requested to send some data to other client/s
it will be called to deliver the data to the FeatureCloud Controller.
Then sets status variables (status_*) accordingly for the next data to send from
self.data_outgoing
"""
if len(self.data_outgoing) == 0:
# no data to send
return None
# extract current data to be sent
data, status = self.data_outgoing.pop(0)
# check if the last status request was answered with the same status
# as the data is supposed to be sent with
# we just compare the JSON strings
if status != self.last_send_status:
raise Exception("Race condition error, the controller got sent a" +
"different GET/status object that the one intended with this data object."+
"Needed status object: {}, actual status object: {}".format(
status, self.last_send_status))
# status is fine, send data out
return data
def _register_state(self, name, state, participant, coordinator, **kwargs):
""" Instantiates a state, provides app-level information and adds it as part of the app workflow.
Parameters
----------
name: str
state: AppState
participant: bool
coordinator: bool
"""
if self.transitions.get(name):
self.log(f'state {name} already exists', level=LogLevel.FATAL)
si = state(**kwargs)
si._app = self
si.name = name
si.participant = participant
si.coordinator = coordinator
self.states[si.name] = si
def register_transition(self, name: str, source: str, target: str, participant=True, coordinator=True, label: Union[str,None] = None):
""" Receives transition registration parameters, check the validity of its logic,
and consider it as one possible transitions in the workflow.
There will be exceptions if apps try to register a transition with contradicting roles.
Parameters
----------
name: str
Name of the transition
source: str
Name of the source state
target: str
Name of the target state
participant: bool
Indicates whether the transition is allowed for participant role
coordinator: bool
Indicates whether the transition is allowed for the coordinator role
"""
if not participant and not coordinator:
self.log('either participant or coordinator must be True', level=LogLevel.FATAL)
if self.transitions.get(name):
self.log(f'transition {name} already exists', level=LogLevel.FATAL)
source_state = self.states.get(source)
if not source_state:
self.log(f'source state {source} not found', level=LogLevel.FATAL)
if participant and not source_state.participant:
self.log(f'source state {source} not accessible for participants', level=LogLevel.FATAL)
if coordinator and not source_state.coordinator:
self.log(f'source state {source} not accessible for the coordinator', level=LogLevel.FATAL)
target_state = self.states.get(target)
if not target_state:
self.log(f'target state {target} not found', level=LogLevel.FATAL)
if participant and not target_state.participant:
self.log(f'target state {target} not accessible for participants', level=LogLevel.FATAL)
if coordinator and not target_state.coordinator:
self.log(f'target state {target} not accessible for the coordinator', level=LogLevel.FATAL)
self.transitions[name] = (source_state, target_state, participant, coordinator, label)
def transition(self, name):
""" Transits the app workflow to the unique next state based on
current states, the role of the FeatureCloud client,
and requirements of registered transitions for the current state.
Parameters
----------
name: str
Name of the transition(which includes name of current and the next state).
"""
transition = self.transitions.get(name)
if not transition:
self.log(f'transition {name} not found', level=LogLevel.FATAL)
if transition[0] != self.current_state:
self.log('current state unequal to source state', level=LogLevel.FATAL)
if not transition[2] and not self.coordinator:
self.log(f'cannot perform transition {name} as participant', level=LogLevel.FATAL)
if not transition[3] and self.coordinator:
self.log(f'cannot perform transition {name} as coordinator', level=LogLevel.FATAL)
self.transition_log.append((datetime.datetime.now(), name))
self.current_state = transition[1]
self.status_message = self.current_state.name
def log(self, msg, level: LogLevel = LogLevel.DEBUG):
"""
Prints a log message or raises an exception according to the log level.
Parameters
----------
msg : str
message to be displayed
level : LogLevel, default=LogLevel.DEBUG
determines the channel (stdout, stderr) or whether to trigger an exception
"""
msg = f'[Time: {datetime.datetime.now().strftime("%d.%m.%y %H:%M:%S")}] [Level: {level.value}] {msg}'
if level == LogLevel.FATAL:
raise RuntimeError(msg)
if level == LogLevel.ERROR:
print(msg, flush=True, file=sys.stderr)
else:
print(msg, flush=True)
[docs]class AppState(abc.ABC):
""" Defining custom states
AppState is the class used when programming a FeatureCloud App to create
states and to communicate with other clients. Generally, a FeatureCloud app
consists of different states that all share the same AppState class.
The states themselves are created as classes with the
@app_state("statename") decorator.
See the example below or the template apps for more details.
Attributes
----------
app: App
name: str
Properties
----------
is_coordinator: bool
clients: list[str]
id: str
"""
def __init__(self):
self._app = None
self.name = None
[docs] @abc.abstractmethod
def register(self):
""" This is an abstract method that should be implemented by developers
it calls AppState.register_transition to register transitions for state.
it will be called in App.register method so that, once all states are defined,
in a verifiable way, all app transitions can be registered.
"""
[docs] @abc.abstractmethod
def run(self) -> str:
""" It is an abstract method that should be implemented by developers,
to execute all local or global operation and calculations of the state.
It will be called in App.run() method so that the state perform its operations.
"""
@property
def is_coordinator(self):
""" Boolean variable, if True the this AppState instance represents the
coordinator. False otherwise.
"""
return self._app.coordinator
@property
def clients(self):
""" Contains a list of client IDs of all clients involved in the
current learning run.
"""
return self._app.clients
@property
def id(self):
""" Contains the id of this client
"""
return self._app.id
@property
def coordintor_id(self):
""" Contains the id of the coordinator
"""
return self._app.coordinatorID
[docs] def register_transition(self, target: str, role: Role = Role.BOTH, name: Union[str, None] = None, label: Union[str, None] = None):
"""
Registers a transition in the state machine.
Parameters
----------
target : str
name of the target state
role : Role, default=Role.BOTH
role for which this transition is valid
name : str or None, default=None
name of the transition
"""
if not name:
name = target
participant, coordinator = role.value
self._app.register_transition(f'{self.name}_{name}', self.name, target, participant, coordinator, label)
[docs] def aggregate_data(self, operation: SMPCOperation = SMPCOperation.ADD, use_smpc=False,
use_dp=False, memo=None):
"""
Waits for all participants (including the coordinator instance)
to send data and returns the aggregated value. Will try to convert
each data piece to a np.array and aggregate those arrays.
Therefore, this method only works for numerical data and all datapieces
should be addable.
Parameters
----------
operation : SMPCOperation
specifies the aggregation type
use_smpc : bool, default=False
if True, the data to be aggregated is expected to stem from an SMPC aggregation
use_dp: bool, default=False
if True, will assume that data was sent and modified with the
controllers differential privacy capacities (with use_dp=true in the
corresponding send function). This must be set in the receiving
function due to serialization differences with DP
memo : str or None, default=None
the string identifying a specific communication round.
Any app should ensure that this string is the same over all clients
over the same communication round and that a different string is
used for each communication round. This ensures that no race
condition problems occur
Returns
-------
aggregated value
"""
if not memo:
self._app.receive_counter += 1
memo = f"GATHERROUND{self._app.receive_counter}"
if use_smpc:
return self.await_data(n=1, unwrap=True, is_json=True, memo=memo)
# Data is aggregated already
else:
data = self.gather_data(is_json=use_dp, memo=memo)
return _aggregate(data, operation)
# Data needs to be aggregated according to operation
[docs] def gather_data(self, is_json=False, use_smpc=False, use_dp=False, memo=None):
"""
Waits for all participants (including the coordinator instance) to send data and returns a list containing the received data pieces. Only valid for the coordinator instance.
Parameters
----------
is_json : bool, default=False
[deprecated] when data was sent via DP or SMPC, the data is sent in
JSON serialization. This was used to indicate this but is now
DEPRICATED, use use_smpc/use_dp accordingly instead, they will take
care of serialization automatically.
use_smpc: bool, default=False
Indicated whether the data that is gather was sent using SMPC.
If this is not set to True when data was sent using SMPC, this
function ends up in an endless loop
use_dp: bool, default=False
if True, will assume that data was sent and modified with the
controllers differential privacy capacities (with use_dp=true in the
corresponding send function). This must be set in the receiving
function due to serialization differences with DP
memo : str or None, default=None
the string identifying a specific communication round.
Any app should ensure that this string is the same over all clients
over the same communication round and that a different string is
used for each communication round. This ensures that no race
condition problems occur
Returns
-------
list of n data pieces, where n is the number of participants
"""
if not self._app.coordinator:
self._app.log('must be coordinator to use gather_data', level=LogLevel.FATAL)
n = len(self._app.clients)
if use_smpc or use_dp:
is_json = True
if use_smpc:
n = 1
if not memo:
self._app.receive_counter += 1
memo = f"GATHERROUND{self._app.receive_counter}"
return self.await_data(n, unwrap=False, is_json=is_json, use_dp=use_dp,
use_smpc=use_smpc, memo=memo)
[docs] def await_data(self, n: int = 1, unwrap=True, is_json=False,
use_dp=False, use_smpc=False, memo=None):
"""
Waits for n data pieces and returns them. It is highly recommended to
use the memo variable when using this method
Parameters
----------
n : int, default=1
number of data pieces to wait for. Is ignored when use_smpc is used
as smpc data is aggregated by the controller, therefore only one
data piece is given when using smpc
unwrap : bool, default=True
if True, will return the first element of the collected data (only useful if n = 1)
is_json : bool, default=False
[deprecated] when data was sent via DP or SMPC, the data is sent in
JSON serialization. This was used to indicate this to deserailize
the data correctly, but is now DEPRICATED, use use_smpc/use_dp
accordingly instead, they will take care of serialization
automatically.
use_dp : bool, default=False
if True, will assume that data was sent and modified with the
controllers differential privacy capacities (with use_dp=true in the
corresponding send function). This must be set in the receiving
function due to serialization differences with DP
use_smpc: bool, default=False
if True, will ensure that n is set to 1 and the correct
serialization is used (SMPC uses JSON serialization)
memo : str or None, default=None
RECOMMENDED TO BE SET FOR THIS METHOD!
the string identifying a specific communication round. The same
string that is used in this call must be used before in the
corresponding sending functions.
Any app should ensure that this string is the same over all clients
over the same communication round and that a different string is
used for each communication round. This ensures that no race
condition problems occur.
Returns
-------
list of data pieces (if n > 1 or unwrap = False) or a single data piece (if n = 1 and unwrap = True)
"""
if use_smpc:
n = 1
is_json = True
if use_dp:
is_json = True
if not memo and self._app.coordinator:
# only increment for the coordinator to really avoid any p2p
# problems
if (n == len(self._app.clients) and use_smpc==False) \
or (n == len(self._app.clients) - 1 and use_smpc==False) \
or use_smpc == True:
# this is a gather/aggregate call, although in theory
# (n==1 and is_json=True) could also be an SMPC gather call,
# but it cannot be differentiate between that being an SMPC
# gather call or an p2p with DP call
self._app.receive_counter += 1
memo = f"GATHERROUND{self._app.receive_counter}"
try:
memo = str(memo)
except Exception as e:
self._app.log(
f"given memo cannot be translated to a string, ERROR: {e}",
LogLevel.Error)
while True:
num_data_pieces = 0
if memo in self._app.data_incoming:
num_data_pieces = len(self._app.data_incoming[memo])
if num_data_pieces >= n:
# warn if too many data pieces came in
if num_data_pieces > n:
self._app.log(
f"await was used to wait for {n} data pieces, " +
f"but more data pieces ({num_data_pieces}) were found. " +
f"Used memo is <{memo}>",
LogLevel.ERROR)
# extract and deseralize the data
data = self._app.data_incoming[memo][:n]
self._app.data_incoming[memo] = self._app.data_incoming[memo][n:]
if len(self._app.data_incoming[memo]) == 0:
# clean up the dict regularly
del self._app.data_incoming[memo]
if n == 1 and unwrap:
return _deserialize_incoming(data[0][0], is_json=is_json)
else:
return [_deserialize_incoming(d[0], is_json=is_json) for d in data]
sleep(DATA_POLL_INTERVAL)
[docs] def send_data_to_participant(self, data, destination, use_dp=False,
memo=None):
"""
Sends data to a particular participant identified by its ID. Should be
used for any specific communication to individual clients.
For the communication schema of all clients/all clients except the
coordinator sending data to the coordinator, use send_data_to_coordinator
Parameters
----------
data : object
data to be sent
destination : str
destination client ID, get this from e.g. self.clients
use_dp : bool, default=False
Whether to use differential privacy(dp) before sending out the data.
To configure the hypterparameters of dp, use the configure_dp method
before this method. The receiving method must also use the same
setting of the use_dp flag or there will be serialization problems
memo : str or None, default=None
RECOMMENDED TO BE SET FOR THIS METHOD!
the string identifying a specific communication round.
This ensures that there are no race condition problems and the
correct data piece can be identified by the recipient of the
data piece sent with this function call. The recipient of this data
must use the same memo to identify the data.
"""
try:
memo = str(memo)
except Exception as e:
self._app.log(
f"given memo cannot be translated to a string, ERROR: {e}",
LogLevel.Error)
data = _serialize_outgoing(data, is_json=use_dp)
if destination == self._app.id and not use_dp:
# In no DP case, the data does not have to be sent via the controller
self._app.handle_incoming(data, client=self._app.id, memo=memo)
else:
# update the status variables and get the status object
message = self._app.status_message if self._app.status_message else (self._app.current_state.name if self._app.current_state else None)
dp = self._app.default_dp if use_dp else None
self._app.status_message = message
status = self._app.get_current_status(message=message,
destination=destination, dp=dp, memo=memo,
available=True)
self._app.data_outgoing.append((data, json.dumps(status, sort_keys=True)))
[docs] def send_data_to_coordinator(self, data, send_to_self=True, use_smpc=False,
use_dp=False, memo=None):
"""
Sends data to the coordinator instance. Must be used by all clients
or all clients except for the coordinator itself when no memo is given,
as the automated memo used when using memo=None breaks otherwise.
If any subset of clients should communicate with the coordinator,
either define the memo or use
send_data_to_participant(destination=self.coordintor_id) with a memo.
Parameters
----------
data : object
data to be sent
send_to_self : bool, default=True
if True, the data will also be sent internally to this instance (only applies to the coordinator instance)
use_smpc : bool, default=False
if True, the data will be sent as part of an SMPC aggregation step
use_dp : bool, default=False
Whether to use differential privacy(dp) before sending out the data.
To configure the hypterparameters of dp, use the configure_dp method
before this method. The receiving method must also use the same
setting of the use_dp flag or there will be serialization problems
memo : str or None, default=None
the string identifying a specific communication round.
This ensures that there are no race condition problems so that the
coordinator uses the correct data piece from each client for each
communication round. This also ensures that workflows where
participants send data to the coordinator without waiting for a
response work
"""
# if no memo is given (default), we use the counter from App
if not memo:
self._app.send_counter += 1
memo = f"GATHERROUND{self._app.send_counter}"
# ensure memo can be sent as a string
try:
memo = str(memo)
except Exception as e:
self._app.log(
f"given memo cannot be translated to a string, ERROR: {e}",
LogLevel.Error)
if use_smpc or use_dp:
data = _serialize_outgoing(data, is_json=True)
else:
data = _serialize_outgoing(data, is_json=False)
if self._app.coordinator and not use_smpc and not use_dp:
# coordinator sending itself data, if that is wanted (send_to_self),
# and neither dp nor smpc are used, the controller does not have to be used
# for sending the data
if send_to_self:
self._app.handle_incoming(data, self._app.id, memo)
else:
# for SMPC and DP, the data has to be sent via the controller
if use_dp and self._app.coordinator:
# give the coordinator as destination,
# else, it will be interpreted as a broadcast action
destination = self._app.id
else:
destination = None
# this is interpreted as to the coordinator
message = self._app.status_message if self._app.status_message else (self._app.current_state.name if self._app.current_state else None)
self._app.status_message = message
smpc = self._app.default_smpc if use_smpc else None
dp = self._app.default_dp if use_dp else None
status = self._app.get_current_status(message=message,
destination=destination, smpc=smpc, dp=dp, memo=memo,
available=True)
self._app.data_outgoing.append((data, json.dumps(status, sort_keys=True)))
[docs] def broadcast_data(self, data, send_to_self=True, use_dp = False,
memo = None):
"""
Broadcasts data to all participants (only valid for the coordinator instance).
Parameters
----------
data : object
data to be sent
send_to_self : bool
if True, the data will also be sent internally to this coordinator instance
use_dp : bool, default=False
Whether to use differential privacy(dp) before sending out the data.
To configure the hypterparameters of dp, use the configure_dp method
before this method. The receiving method must also use the same
setting of the use_dp flag or there will be serialization problems
memo : str or None, default=None
the string identifying a specific communication round.
This ensures that there are no race condition problems so that the
participants and the coordinator can differentiate between this
data piece broadcast and other data pieces they receive from the
coordinator.
"""
try:
memo = str(memo)
except Exception as e:
self._app.log(
f"given memo cannot be translated to a string, ERROR: {e}",
LogLevel.Error)
if not self._app.coordinator:
self._app.log('only the coordinator can broadcast data', level=LogLevel.FATAL)
is_json = False
if use_dp:
is_json = True
# serialize before broadcast
data = _serialize_outgoing(data, is_json=use_dp)
message = self._app.status_message if self._app.status_message else (self._app.current_state.name if self._app.current_state else None)
self._app.status_message = message
dp = self._app.default_dp if use_dp else None
status = self._app.get_current_status(message=message,
destination=None, dp=dp, memo=memo,
available=True)
if send_to_self:
self._app.handle_incoming(data, client=self._app.id, memo=memo)
self._app.data_outgoing.append((data, json.dumps(status, sort_keys=True)))
[docs] def update(self, message: Union[str, None] = None, progress: Union[float, None] = None,
state: Union[State, None] = None):
"""
Updates information about the execution.
Parameters
----------
message : str
message briefly summarizing what is happening currently
progress : float
number between 0 and 1, indicating the overall progress
state : State or None
overall state (running, error or action_required)
"""
if message and len(message) > 40:
self._app.log('message is too long (max: 40)', level=LogLevel.FATAL)
if progress is not None and (progress < 0 or progress > 1):
self._app.log('progress must be between 0 and 1', level=LogLevel.FATAL)
if state is not None and state != State.RUNNING and state != State.ERROR and state != State.ACTION:
self._app.log('invalid state', level=LogLevel.FATAL)
self._app.status_message = message
self._app.status_progress = progress
self._app.status_state = state.value if state else None
[docs] def store(self, key: str, value):
""" Store allows to share data across different AppState instances.
Parameters
----------
key: str
value:
"""
self._app.internal[key] = value
[docs] def load(self, key: str):
""" Load allows to access data shared across different AppState instances.
Parameters
----------
key: str
Returns
-------
value:
Value stored previously using store
"""
return self._app.internal.get(key)
[docs] def log(self, msg, level: LogLevel = LogLevel.DEBUG):
"""
Prints a log message or raises an exception according to the log level.
Parameters
----------
msg : str
message to be displayed
level : LogLevel, default=LogLevel.DEBUG
determines the channel (stdout, stderr) or whether to trigger an exception
"""
self._app.log(f'[State: {self.name}] {msg}', level)
def app_state(name: str, role: Role = Role.BOTH, app_instance: Union[App, None] = None, **kwargs):
if app_instance is None:
app_instance = app
participant, coordinator = role.value
if not participant and not coordinator:
app_instance.log('either participant or coordinator must be True', level=LogLevel.FATAL)
def func(state_class):
app_instance._register_state(name, state_class, participant, coordinator, **kwargs)
return state_class
return func
class _NumpyArrayEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
# call default Encoder in other cases
return json.JSONEncoder.default(self, obj)
def _serialize_outgoing(data, is_json=False):
"""
Transforms a Python data object into a byte serialization.
Parameters
----------
data : object
data to serialize
is_json : bool, default=False
indicates whether JSON serialization is required
Returns
----------
serialized data as bytes
"""
if not is_json:
return pickle.dumps(data)
# we use a custom cls to manage numpy which is quite common
return json.dumps(data, cls=_NumpyArrayEncoder)
def _deserialize_incoming(data: bytes, is_json=False):
"""
Transforms serialized data bytes into a Python object.
Parameters
----------
data : bytes
data to deserialize
is_json : bool, default=False
indicates whether JSON deserialization should be used
Returns
----------
deserialized data
"""
if not is_json:
return pickle.loads(data)
return json.loads(data)
def _aggregate(data, operation: SMPCOperation):
"""
Aggregates a list of received values.
Parameters
----------
data : array_like
list of data pieces
operation : SMPCOperation
operation to use for aggregation (add or multiply)
Returns
----------
aggregated value
"""
data_np = [np.array(d) for d in data]
aggregate = data_np[0]
if operation == SMPCOperation.ADD:
for d in data_np[1:]:
aggregate = aggregate + d
if operation == SMPCOperation.MULTIPLY:
for d in data_np[1:]:
aggregate = aggregate * d
return aggregate
app = App()