Source code for accre.slurm

"""
General purpose wrappers to call Slurm commands and put output
into a machine-friendly format if needed.
"""
import re
import subprocess

from accre.config import get_config
from accre.exceptions import ACCREError

CONFIG = get_config()

SLURM_INFO_RE = re.compile(r'(^[A-Za-z_]+=| [A-Za-z_]+=)')


[docs]class ACCRESlurmError(ACCREError): """ Error occurring when running a slurm command """
[docs]def slurm_node_info(node, ssh=False): """ Return a dictionary with the slurm properties of a specified node :param str node: Node to collect information about for information collection. Defaults to alloc and idle. :param bool ssh: Use ssh to run command on the remote scheduler :returns: slurm properties for the specified node :rtype: dict(str) """ return _fetch_scontrol_show(node, ssh=ssh)[0]
[docs]def slurm_nodes_info_by_state( states=('alloc','idle'), ssh=False, hidden=False ): """ Return a list of dictionaries with the slurm properties of each node in the cluster that has one of the specified states. :param list(str) states: List of states from which to accept nodes for information collection. Defaults to alloc and idle. :param bool ssh: Use ssh to run command on the remote scheduler :param bool hidden: Allow queries for nodes in hidden partitions :returns: slurm properties for each node in the cluster with one of the specified states. :rtype: list(dict(str)) """ nodes = _get_nodes_by_state(states, ssh=ssh, hidden=hidden) return _fetch_scontrol_show(nodes, ssh=ssh)
[docs]def list_compute_nodes(responding=True, ssh=False, hidden=False): """ Return a list of all compute nodes, by default only the ones responding to the scheduler. :param bool responding: Only show responding nodes if true :param bool ssh: Use ssh to run command on the remote scheduler :param bool hidden: Include nodes in hidden partititons :returns: all compute nodes :rtype: list(str) """ arglist = ['sinfo', '-o', '%n'] if hidden: arglist.append('-a') if responding: arglist.append('-r') output = run_slurm_command(arglist, ssh=ssh) # remove first line of output which is a header return output.splitlines()[1:]
[docs]def get_default_groups(ssh=False): """ Return a dict of usernames with their default group as a value. :param bool ssh: Use ssh to run command on the remote scheduler :returns: default scheduler group for each user :rtype: dict(str, str) """ arglist = ['sacctmgr', 'show', 'users', '-p'] output = run_slurm_command(arglist, ssh=ssh) fields = [f.strip().lower() for f in output.splitlines()[0].split('|')] result = {} for line in output.splitlines()[1:]: assoc_fields = [f.strip() for f in line.split('|')] result[assoc_fields[fields.index('user')]] = ( assoc_fields[fields.index('def acct')] ) return result
[docs]def get_runaway_jobs(ssh=False): """ Return the output of runaway jobs test :param bool ssh: Use ssh to run command on the remote scheduler :returns: raw output from the slurm command """ arglist = ['sacctmgr', 'show', 'runaway', '-n', '-p'] output = run_slurm_command(arglist, ssh=ssh) return output.strip()
[docs]def get_slurm_associations( user=None, regular=True, accelerated=False, ssh=False ): """ Return a list of dicts each with information about a slurm association including the cluster, group, user, partition, fairshare, max_cpus, max_mem, max_runmins, and qos. Fields that are unset in slurm for the association are set to None. max_cpus, fairshare, and max_runmins are ints if set. :param str user: If not None, only return associations for the specified user :param bool regular: Show associations for regular (non-accelerated) partitions. These should all involve slurm accounts not ending in "_acc". :param bool accelerated: Show associations for accelerated partitions. These should all involve slurm accounts ending in "_acc". :param bool ssh: Use ssh to run command on the remote scheduler :returns: list of association information dicts :rtype: list(dict) """ if user is None: arglist = ['sacctmgr', 'show', 'associations', '-p'] else: arglist = [ 'sacctmgr', 'show', 'associations', 'user={0}'.format(user), '-p' ] output = run_slurm_command(arglist, ssh=ssh) fields = [f.strip().lower() for f in output.splitlines()[0].split('|')] result = [] for line in output.splitlines()[1:]: assoc_fields = [f.strip() for f in line.split('|')] assoc = {} assoc['cluster'] = assoc_fields[fields.index('cluster')] assoc['account'] = assoc_fields[fields.index('account')] if assoc['account'].endswith("_acc") and not accelerated: continue if not assoc['account'].endswith("_acc") and not regular: continue assoc['user'] = assoc_fields[fields.index('user')] assoc['partition'] = assoc_fields[fields.index('partition')] assoc['fairshare'] = int(assoc_fields[fields.index('share')]) assoc['qos'] = assoc_fields[fields.index('qos')] max_cpu, max_mem, max_runmins = '', '', '' tres = assoc_fields[fields.index('grptres')] for item in tres.split(','): if item.startswith('cpu'): max_cpu = int(item.split('=')[1]) if item.startswith('mem'): max_mem = item.split('=')[1] tres = assoc_fields[fields.index('grptresrunmins')] for item in tres.split(','): if item.startswith('cpu'): max_runmins = int(item.split('=')[1]) assoc['max_cpu'] = max_cpu assoc['max_mem'] = max_mem assoc['max_runmins'] = max_runmins for key in assoc: if assoc[key] == '': assoc[key] = None result.append(assoc) return result
[docs]def get_acc_qos_records(ssh=False): """ Return a list of dicts each with information about a slurm QOS record for an accelerated partition including the group, partition, number of gpus, and flags. Note that this will only return information for qos records of the form "<group>_<partition>_acc" which are expected to correspond to accelerated partitions. :param bool ssh: Use ssh to run command on the remote scheduler :returns: list of QOS information dicts :rtype: list(dict) """ arglist = ['sacctmgr', 'show', 'qos', '-p'] output = run_slurm_command(arglist, ssh=ssh) fields = [f.strip().lower() for f in output.splitlines()[0].split('|')] result = [] for line in output.splitlines()[1:]: assoc_fields = [f.strip() for f in line.split('|')] assoc = {} name = assoc_fields[fields.index('name')] if not name.endswith('_acc') or not len(name.split('_')) > 2: continue assoc['partition'] = name.split('_')[-2] assoc['group'] = '_'.join(name.split('_')[:-2]) assoc['flags'] = assoc_fields[fields.index('flags')] gpus = '' tres = assoc_fields[fields.index('grptres')] for item in tres.split(','): if item.startswith('gres/gpu'): gpus = int(item.split('=')[1]) assoc['gpus'] = gpus for key in assoc: if assoc[key] == '': assoc[key] = None result.append(assoc) return result
[docs]def create_acc_qos(group, partition, gpus=2, flags='OverPartQOS', ssh=False): """ Create a QOS record for use with a group and an accelerated partition to be applied to associations involving that group. The QOS record created will be of the form "<group>_<partition>_acc". :param str group: group that this QOS should be used for :param str partition: parition that this QOS should be used for :param int gpus: maximum gpu resources for this QOS :param str flags: comma separated list of flags for the QOS :param bool ssh: Use ssh to run command on the remote scheduler """ qos_name = '{0}_{1}_acc'.format(group, partition) arglist = [ 'sacctmgr', '-i', 'create', 'qos', qos_name, 'grptres=gres/gpu={0}'.format(gpus), 'flags={0}'.format(flags) ] run_slurm_command(arglist, ssh=ssh)
[docs]def delete_acc_qos(group, partition, ssh=False): """ Delete a QOS record for use with a group and an accelerated partition. The QOS record deleted will be of the form "<group>_<partition>_acc". :param str group: group for this QOS :param str partition: parition for this QOS :param bool ssh: Use ssh to run command on the remote scheduler """ qos_name = '{0}_{1}_acc'.format(group, partition) arglist = ['sacctmgr', '-i', 'delete', 'qos', qos_name] run_slurm_command(arglist, ssh=ssh)
[docs]def modify_acc_qos_gpus(group, partition, gpus, ssh=False): """ Modify a QOS record for use with a group and an accelerated partition to change the number of allowed gpus. The QOS record modified will be of the form "<group>_<partition>_acc". :param str group: group for this QOS :param str partition: parition for this QOS :param int gpus: The new number of allowed gpus :param bool ssh: Use ssh to run command on the remote scheduler """ qos_name = '{0}_{1}_acc'.format(group, partition) arglist = [ 'sacctmgr', '-i', 'modify', 'qos', qos_name, 'set', 'grptres=gres/gpu={0}'.format(gpus) ] run_slurm_command(arglist, ssh=ssh)
[docs]def create_slurm_association(user, group, partition, fairshare=None, max_cpu=None, max_mem=None, qos=None, ssh=False ): """ Create a new slurm association for the given user, group, partition with specified optional parameters :param str user: ACCRE user (VUNetID) to create the association for :param str group: The group for the association :param str partition: The cluster partition for the association :param int fairshare: Optional fairshare for this association :param int max_cpu: Optional maximum cores for this association :param str max_mem: Optional maximum memory for this association :param str qos: Optional qos for this association :param bool ssh: Use ssh to run command on the remote scheduler """ arglist = [ 'sacctmgr', 'add', 'user', user, '-i', 'Account={0}'.format(group), 'Partition={0}'.format(partition), ] if fairshare is not None: arglist.append('Share={0}'.format(fairshare)) if qos is not None: arglist.append('QOS={0}'.format(qos)) tres = [] if max_cpu is not None: tres.append('cpu={0}'.format(max_cpu)) if max_mem is not None: tres.append('mem={0}'.format(max_mem)) if tres: arglist.append('GrpTRES="{0}"'.format(','.join(tres))) run_slurm_command(arglist, ssh=ssh)
[docs]def delete_slurm_association(user, group, partition, ssh=False): """ Delete a slurm association for the given user, group, partition :param str user: ACCRE user (VUNetID) for the association :param str group: The group for the association :param str partition: The cluster partition for the association :param bool ssh: Use ssh to run command on the remote scheduler """ arglist = [ 'sacctmgr', 'delete', 'user', user, '-i', 'Account={0}'.format(group), 'Partition={0}'.format(partition), ] run_slurm_command(arglist, ssh=ssh)
[docs]def set_default_group(user, group, ssh=False): """ Set the default group for a user in slurm :param str user: ACCRE user (VUNetID) :param str group: The desired default group :param bool ssh: Use ssh to run command on the remote scheduler """ arglist = [ 'sacctmgr', 'modify', 'user', '-i', 'name={0}'.format(user), 'set', 'defaultaccount={0}'.format(group) ] run_slurm_command(arglist, ssh=ssh)
[docs]def get_slurm_users(ssh=False): """ Return a list of dicts each with information about a slurm user including the name, default account (default), and administrative access (admin). :param bool ssh: Use ssh to run command on the remote scheduler :returns: list of user information dicts :rtype: list(dict) """ arglist = ['sacctmgr', 'show', 'users', '-p'] output = run_slurm_command(arglist, ssh=ssh) fields = [f.strip().lower() for f in output.splitlines()[0].split('|')] result = [] for line in output.splitlines()[1:]: user_fields = [f.strip() for f in line.split('|')] user = {} user['name'] = user_fields[fields.index('user')] user['default'] = user_fields[fields.index('def acct')] user['admin'] = False if user_fields[fields.index('admin')] == 'Administrator': user['admin'] = True result.append(user) return result
[docs]def groups_by_account(ssh=False): """ Return a dictionary keyed by ACCRE slurm accounts with a list of slurm groups for each account. :param bool ssh: Use ssh to run command on the remote scheduler :returns: All groups for each account :rtype: dict(str, list(str)) """ arglist = ['sacctmgr', 'show', 'account', '-p'] output = run_slurm_command(arglist, ssh=ssh) fields = [f.strip().lower() for f in output.splitlines()[0].split('|')] result = {} for line in output.splitlines()[1:]: assoc_fields = [f.strip() for f in line.split('|')] org = assoc_fields[fields.index('org')] acct = assoc_fields[fields.index('account')] if org not in result: result[org] = [] if not acct.endswith('_account'): result[org].append(acct) return result
[docs]def get_sdiag_infor(options, ssh=False): """ Return the sdiag output information from slurm. The options is passed outside :param options: options for the sdiag, in form of list of string :returns: raw output from sdiag """ arglist = ['sdiag'] + options output = run_slurm_command(arglist, ssh=ssh) return output.strip()
[docs]def get_sacctmgr_status(options, ssh=False): """ Return the sacctmgr status checkout information from slurm. The options is passed outside :param options: options for the sacctmgr check, in form of list of string :returns: raw output from sacctmgr status check """ arglist = ['sacctmgr', 'show', 'stats'] + options output = run_slurm_command(arglist, ssh=ssh) return output.strip()
[docs]def get_sacct_infor(options, ssh=False): """ Return the sacct command output information from slurm. The options is passed outside :param options: options for running sacct command, in form of list of string :returns: raw output from sacct output """ arglist = ['sacct'] + options output = run_slurm_command(arglist, ssh=ssh, timeout=240) # split the raw output into lines and return dataLines = output.splitlines() return dataLines
[docs]def get_squeue_infor(options, ssh=False): """ Return the squeue command output information from slurm. The options is passed outside :param options: options for running squeue command, in form of list of string :returns: raw output from squeue output """ arglist = ['squeue'] + options output = run_slurm_command(arglist, ssh=ssh) # split the raw output into lines and return dataLines = output.splitlines() return dataLines
[docs]def get_nodes_infor(options, ssh=False): """ Run the sinfo command so that to get the nodes and cores information for the given partition, such information can be used for drawing cluster ultilization graph. Here we passed the general form of option from outside, just like the function of get_sacct_infor so that it's flexible for the user to define the specific data fields. :param options: options passed to the sinfo command to get the nodes/cores information :returns: the given nodes/cores information from sinfo """ arglist = ['sinfo'] + options output = run_slurm_command(arglist, ssh=ssh) # split the raw output into lines and return dataLines = output.splitlines() return dataLines
[docs]def slurm_version(): """ This function returns slurm current version """ # use squeue command to simply get the slurm version information arglist = ['squeue', "--version"] output = run_slurm_command(arglist) # get the slurm version and return it s = output.strip().split() if len(s)<=1: raise ACCRESlurmError( 'Error running slurm command to get slurm version {0}: {1}'.format(arglist, output) ) ver = s[1] return ver
def _fetch_scontrol_show(nodes, ssh=False): """ Fetch info from scontrol for the specified nodes, parse and return as a list """ arglist = ['scontrol', 'show', 'nodes', nodes, '-a', '-d', '-o'] output = run_slurm_command(arglist, ssh=ssh) raw = output.splitlines() nodeinfo = [] for line in raw: keys = [s.strip(' =') for s in SLURM_INFO_RE.split(line)[1::2]] vals = SLURM_INFO_RE.split(line)[2::2] nodeinfo.append(dict(zip(keys, vals))) return nodeinfo def _get_nodes_by_state(states, ssh=False, hidden=False): """ Return a slurm-formatted string representing a list of all nodes that have the given states. """ statestr = '--states=' + ','.join(states) arglist = ['sinfo', statestr, '--format=%N', '--noheader'] if hidden: arglist.append('-a') output = run_slurm_command(arglist, ssh=ssh) return output.strip()
[docs]def run_slurm_command(arglist, ssh=False, timeout=60): """ Run a specified slurm command with arguments and return standard output decoded to UTF-8. If ssh is set to True, ssh to the configured slurm server to run the command. This requires either that non-interactive authentication is set up or that this is run interactively. :param list(str) arglist: List of slurm command and arguments. Note that this will not be interpreted by a shell. :param bool ssh: Use ssh to run command on the remote scheduler. :param int timeout: set the timeout to this many seconds (default 60) :returns: Output of the command decoded to utf-8 :rtype: str """ if ssh: arglist[0:0] = ['ssh', CONFIG['slurm']['scheduler']] proc = subprocess.Popen( arglist, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL ) stdout, stderr = proc.communicate(timeout=timeout) if proc.returncode != 0: raise ACCRESlurmError( 'Error running slurm command {0}: {1}'.format(arglist, stderr) ) return stdout.decode('utf-8')
# Old scripts have imported run_slurm_command with a leading underscore # when it was a private function to this module (tsk tsk), so to keep # compatibility we define it alternately with the underscore _run_slurm_command = run_slurm_command