mbircone.multinode

Functions:

get_cluster_ticket(job_queue_system_type, ...)

A utility to return a ticket needed for scatter_gather() to access a parallel cluster.

scatter_gather(cluster_ticket, func[, ...])

Distribute a function call across multiple nodes, as specified by the cluster argument.

mbircone.multinode.get_cluster_ticket(job_queue_system_type, num_physical_cores_per_node, num_nodes=1, maximum_memory_per_node='16GB', maximum_allowable_walltime=None, system_specific_args='', local_directory='./', log_directory='./')[source]

A utility to return a ticket needed for scatter_gather() to access a parallel cluster. The defaults are set to use one python thread per node under the assumption that the python thread calls C code that creates a number of threads equal to the number of physical cores in the node.

  • On SLURM, you can use sinfo to get information about your cluster configuration.

  • On SGE, you can use qhost to get information about your cluster configuration.

Parameters:
  • job_queue_system_type (string) – One of ‘SGE’ (Sun Grid Engine), ‘SLURM’, ‘LocalHost’

  • num_physical_cores_per_node (int) – Number of physical cores per node = (number of cpus) x (cores per cpu).

  • num_nodes (int) – [Default=1] Requested number of nodes for parallel computation.

  • maximum_memory_per_node (str, optional) – [Default=’16GB’] Requested maximum memory per node, e.g. ‘100MB’ or ‘16GB’.

  • maximum_allowable_walltime (str, optional) – [Default=None] Maximum allowable walltime as a string in the form D-HH:MM:SS. E.g., ‘0-01:00:00’ for one hour. If None, the scheduler will allocate a system-determined maximum.

  • system_specific_args (str, optional) – [Default=None] Any additional arguments to pass to the job scheduling system. Consult your local documentation or system administrator.

  • local_directory (str, optional) – [Default=’./’] Desired local directory for file spilling in parallel computation. Recommend to set it to a location of fast local storage like /scratch or $TMPDIR.

  • log_directory (str, optional) – [Default=’./’] Desired directory to store Dask’s job scheduler logs. For each reserved node, there will be two different log files, error log and output log. Users can check those log files to find the information printed from the parallel functions.

Returns:

A two-element tuple including

  • cluster_ticket: A cluster ticket to access the job-queue system via the scatter_gather function.

  • maximum_possible_nb_worker (int): Maximum possible number of workers that we can request to start the jobs deployment.

mbircone.multinode.scatter_gather(cluster_ticket, func, constant_args={}, variable_args_list=[], min_nodes=None, verbose=1)[source]

Distribute a function call across multiple nodes, as specified by the cluster argument. The given function, func, is called with a set of keyword arguments, some that are the same for all calls, as specified in constant_args, and some that vary with each call, as specified in variable_args_list.

Returns a list obtained by collecting the output from each call of func. The length of the output list is the length of variable_args_list.

Parameters:
  • cluster_ticket (Object) – A ticket used to access a specific cluster, that can be obtained from get_cluster_ticket(). If cluster_ticket=None, the process will run in serial. See dask_jobqueue for more information.

  • func (callable) – A callable function with keyword arguments matching the entries in constant_args and variable_args_list.

  • constant_args (dictionary) – [Default={}] A dictionary of keyword arguments that are the same for all calls of func.

  • variable_args_list (list[dictionary]) – [Default=[]] A list of dictionaries of keyword arguments. Each dictionary contains arguments for one call of func.

  • min_nodes (int) – [Default=None] Requested minimum number of workers to start parallel computation. The job will not start until the number of nodes >= min_nodes, and once it starts, no further nodes will be used. The default is num_nodes from the cluster_ticket.

  • verbose (int) – [Default=0] Possible values are {0,1}, where 0 is quiet and 1 prints parallel computation process information.

Returns:

A list obtained by collecting the output from each call of func. The length of the output list is the length of variable_args_list. Each entry in the list will be the output of one call of func.