Source code for pydsef.service
import rpyc
from rpyc.utils.server import ThreadedServer
import copy
import inspect
import shutil
import os
[docs]class Registry:
"""Class for registering experiment funcionts."""
connect_list = []
disconnect_list = []
setup_list = []
launch_list = []
run_fun = None
teardown_list = []
server = None
@staticmethod
[docs] def connect(fun):
"""Decoration for registering 'connect' functions. These functions are run when
the Jupyter server connects to the master node over RPyC."""
Registry.connect_list.append(fun)
return fun
@staticmethod
[docs] def disconnect(fun):
"""Decoration for registering 'disconnect' functions. These functions are run when
the Jupyter server disconnects from the RPyC server running on the master node."""
Registry.disconnect_list.append(fun)
return fun
@staticmethod
[docs] def setup(fun):
"""Decoration for registering 'setup' functions. These functions are run during the
setup phase of each trial in an experiment."""
Registry.setup_list.append(fun)
return fun
@staticmethod
[docs] def launch(fun):
"""Decoration for registering 'launch' functions. These functions are run during the
launch phase of each trial in an experiment."""
Registry.launch_list.append(fun)
return fun
@staticmethod
[docs] def run(fun):
"""Deocration for registering a 'run' function. You can only register one of these. This
function is executed during the run phase of each trial in an experiment."""
if Registry.run_fun == None:
Registry.run_fun = fun
else:
raise Exception
return fun
@staticmethod
[docs] def teardown(fun):
"""Decoration for registering 'teardown' functions. These functions are run during the
teardown phase of each trial in an experiment."""
Registry.teardown_list.append(fun)
return fun
@staticmethod
[docs] def experiment(cls, port = 18861):
"""Class decoration for registering an experiment service. This automatically adds an archive
function to the class as well as starting a ThreadedServer RPyC instance."""
def archive(self, *filenames):
output = []
for fn in filenames:
output.append(shutil.make_archive(fn, 'gztar', base_dir=fn))
return output
cls.exposed_archive = archive
server = ThreadedServer(cls, port = port, protocol_config = {'allow_pickle':True})
print("Starting RPyC Server...")
server.start()
return cls
[docs]class Service(rpyc.Service):
"""Base class for the experimental run object. None of these methods are meant to be called by a user.
They are called at the appropiate time during an experiment."""
[docs] def on_connect(self, *args, **kwargs):
"""Needed by rpyc.Service superclass. This calls all registered 'connect' functions at the appropiate time."""
for f in Registry.connect_list:
f(self, *args, **kwargs)
[docs] def on_disconnect(self, *args, **kwargs):
"""Needed by rpyc.Service superclass. This calls all registered 'disconnect' functions at the appropiate time."""
for f in Registry.disconnect_list:
f(self, *args, **kwargs)
[docs] def exposed_setup(self, exp_dict, conf):
"""This exposes a 'setup' function on the RPyC service.
During an experiment this is connected to from Jupyter and executed on the master node.
All registered 'setup' functions will be called when this function is called.
This function additionally defines member variables using the passed in exp_dict."""
exp_dict = copy.deepcopy(exp_dict)
conf = copy.deepcopy(conf)
for key in exp_dict:
setattr(self, key, exp_dict[key])
for f in Registry.setup_list:
f(self, exp_dict, conf)
[docs] def exposed_launch(self, *args, **kwargs):
"""This exposes a 'launch' function on the RPyC service.
During an experiment this is connected to from Jupyter and executed on the master node.
All registered 'launch' functions will be called when this function is called."""
for f in Registry.launch_list:
f(self, *args, **kwargs)
[docs] def exposed_run(self, *args, **kwargs):
"""This exposes a 'run' function on the RPyC service.
During an experiment this is connected to from Jupyter and executed on the master node.
The registered 'run' function will be called when this function is called."""
return Registry.run_fun(self, *args, **kwargs)
[docs] def exposed_teardown(self, *args, **kwargs):
"""This exposes a 'teardown' function on the RPyC service.
During an experiment this is connected to from Jupyter and executed on the master node.
All registered 'teardown' functions will be called when this function is called."""
for f in Registry.teardown_list:
f(self, *args, **kwargs)