"""
General purpose wrappers to call Slurm commands and put output
into a machine-friendly format if needed.
"""
import re
import subprocess
from accre.config import get_config
from accre.exceptions import ACCREError
CONFIG = get_config()
SLURM_INFO_RE = re.compile(r'(^[A-Za-z_]+=| [A-Za-z_]+=)')
[docs]class ACCRESlurmError(ACCREError):
"""
Error occurring when running a slurm command
"""
[docs]def slurm_node_info(node, ssh=False):
"""
Return a dictionary with the slurm properties of a specified node
:param str node: Node to collect information about
for information collection. Defaults to alloc and idle.
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: slurm properties for the specified node
:rtype: dict(str)
"""
return _fetch_scontrol_show(node, ssh=ssh)[0]
[docs]def slurm_nodes_info_by_state(
states=('alloc','idle'),
ssh=False,
hidden=False
):
"""
Return a list of dictionaries with the slurm properties of each node
in the cluster that has one of the specified states.
:param list(str) states: List of states from which to accept nodes
for information collection. Defaults to alloc and idle.
:param bool ssh: Use ssh to run command on the remote scheduler
:param bool hidden: Allow queries for nodes in hidden partitions
:returns: slurm properties for each node in the cluster with one of the
specified states.
:rtype: list(dict(str))
"""
nodes = _get_nodes_by_state(states, ssh=ssh, hidden=hidden)
return _fetch_scontrol_show(nodes, ssh=ssh)
[docs]def list_compute_nodes(responding=True, ssh=False, hidden=False):
"""
Return a list of all compute nodes, by default only the ones responding
to the scheduler.
:param bool responding: Only show responding nodes if true
:param bool ssh: Use ssh to run command on the remote scheduler
:param bool hidden: Include nodes in hidden partititons
:returns: all compute nodes
:rtype: list(str)
"""
arglist = ['sinfo', '-o', '%n']
if hidden:
arglist.append('-a')
if responding:
arglist.append('-r')
output = run_slurm_command(arglist, ssh=ssh)
# remove first line of output which is a header
return output.splitlines()[1:]
[docs]def get_default_groups(ssh=False):
"""
Return a dict of usernames with their default group as
a value.
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: default scheduler group for each user
:rtype: dict(str, str)
"""
arglist = ['sacctmgr', 'show', 'users', '-p']
output = run_slurm_command(arglist, ssh=ssh)
fields = [f.strip().lower() for f in output.splitlines()[0].split('|')]
result = {}
for line in output.splitlines()[1:]:
assoc_fields = [f.strip() for f in line.split('|')]
result[assoc_fields[fields.index('user')]] = (
assoc_fields[fields.index('def acct')]
)
return result
[docs]def get_runaway_jobs(ssh=False):
"""
Return the output of runaway jobs test
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: raw output from the slurm command
"""
arglist = ['sacctmgr', 'show', 'runaway', '-n', '-p']
output = run_slurm_command(arglist, ssh=ssh)
return output.strip()
[docs]def get_slurm_associations(
user=None,
regular=True,
accelerated=False,
ssh=False
):
"""
Return a list of dicts each with information about a slurm association
including the cluster, group, user, partition, fairshare, max_cpus,
max_mem, max_runmins, and qos. Fields that are unset in slurm for the
association are set to None. max_cpus, fairshare, and max_runmins are
ints if set.
:param str user: If not None, only return associations for the
specified user
:param bool regular: Show associations for regular (non-accelerated)
partitions. These should all involve slurm accounts not ending
in "_acc".
:param bool accelerated: Show associations for accelerated partitions.
These should all involve slurm accounts ending in "_acc".
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: list of association information dicts
:rtype: list(dict)
"""
if user is None:
arglist = ['sacctmgr', 'show', 'associations', '-p']
else:
arglist = [
'sacctmgr', 'show', 'associations',
'user={0}'.format(user), '-p'
]
output = run_slurm_command(arglist, ssh=ssh)
fields = [f.strip().lower() for f in output.splitlines()[0].split('|')]
result = []
for line in output.splitlines()[1:]:
assoc_fields = [f.strip() for f in line.split('|')]
assoc = {}
assoc['cluster'] = assoc_fields[fields.index('cluster')]
assoc['account'] = assoc_fields[fields.index('account')]
if assoc['account'].endswith("_acc") and not accelerated:
continue
if not assoc['account'].endswith("_acc") and not regular:
continue
assoc['user'] = assoc_fields[fields.index('user')]
assoc['partition'] = assoc_fields[fields.index('partition')]
assoc['fairshare'] = int(assoc_fields[fields.index('share')])
assoc['qos'] = assoc_fields[fields.index('qos')]
max_cpu, max_mem, max_runmins = '', '', ''
tres = assoc_fields[fields.index('grptres')]
for item in tres.split(','):
if item.startswith('cpu'):
max_cpu = int(item.split('=')[1])
if item.startswith('mem'):
max_mem = item.split('=')[1]
tres = assoc_fields[fields.index('grptresrunmins')]
for item in tres.split(','):
if item.startswith('cpu'):
max_runmins = int(item.split('=')[1])
assoc['max_cpu'] = max_cpu
assoc['max_mem'] = max_mem
assoc['max_runmins'] = max_runmins
for key in assoc:
if assoc[key] == '':
assoc[key] = None
result.append(assoc)
return result
[docs]def get_acc_qos_records(ssh=False):
"""
Return a list of dicts each with information about a slurm QOS
record for an accelerated partition including the group, partition,
number of gpus, and flags.
Note that this will only return information for qos records of the
form "<group>_<partition>_acc" which are expected to correspond to
accelerated partitions.
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: list of QOS information dicts
:rtype: list(dict)
"""
arglist = ['sacctmgr', 'show', 'qos', '-p']
output = run_slurm_command(arglist, ssh=ssh)
fields = [f.strip().lower() for f in output.splitlines()[0].split('|')]
result = []
for line in output.splitlines()[1:]:
assoc_fields = [f.strip() for f in line.split('|')]
assoc = {}
name = assoc_fields[fields.index('name')]
if not name.endswith('_acc') or not len(name.split('_')) > 2:
continue
assoc['partition'] = name.split('_')[-2]
assoc['group'] = '_'.join(name.split('_')[:-2])
assoc['flags'] = assoc_fields[fields.index('flags')]
gpus = ''
tres = assoc_fields[fields.index('grptres')]
for item in tres.split(','):
if item.startswith('gres/gpu'):
gpus = int(item.split('=')[1])
assoc['gpus'] = gpus
for key in assoc:
if assoc[key] == '':
assoc[key] = None
result.append(assoc)
return result
[docs]def create_acc_qos(group, partition, gpus=2, flags='OverPartQOS', ssh=False):
"""
Create a QOS record for use with a group and an accelerated partition
to be applied to associations involving that group.
The QOS record created will be of the form "<group>_<partition>_acc".
:param str group: group that this QOS should be used for
:param str partition: parition that this QOS should be used for
:param int gpus: maximum gpu resources for this QOS
:param str flags: comma separated list of flags for the QOS
:param bool ssh: Use ssh to run command on the remote scheduler
"""
qos_name = '{0}_{1}_acc'.format(group, partition)
arglist = [
'sacctmgr', '-i', 'create', 'qos', qos_name,
'grptres=gres/gpu={0}'.format(gpus),
'flags={0}'.format(flags)
]
run_slurm_command(arglist, ssh=ssh)
[docs]def delete_acc_qos(group, partition, ssh=False):
"""
Delete a QOS record for use with a group and an accelerated partition.
The QOS record deleted will be of the form "<group>_<partition>_acc".
:param str group: group for this QOS
:param str partition: parition for this QOS
:param bool ssh: Use ssh to run command on the remote scheduler
"""
qos_name = '{0}_{1}_acc'.format(group, partition)
arglist = ['sacctmgr', '-i', 'delete', 'qos', qos_name]
run_slurm_command(arglist, ssh=ssh)
[docs]def modify_acc_qos_gpus(group, partition, gpus, ssh=False):
"""
Modify a QOS record for use with a group and an accelerated partition
to change the number of allowed gpus.
The QOS record modified will be of the form "<group>_<partition>_acc".
:param str group: group for this QOS
:param str partition: parition for this QOS
:param int gpus: The new number of allowed gpus
:param bool ssh: Use ssh to run command on the remote scheduler
"""
qos_name = '{0}_{1}_acc'.format(group, partition)
arglist = [
'sacctmgr', '-i', 'modify', 'qos', qos_name, 'set',
'grptres=gres/gpu={0}'.format(gpus)
]
run_slurm_command(arglist, ssh=ssh)
[docs]def create_slurm_association(user, group, partition,
fairshare=None,
max_cpu=None,
max_mem=None,
qos=None,
ssh=False
):
"""
Create a new slurm association for the given user, group, partition
with specified optional parameters
:param str user: ACCRE user (VUNetID) to create the association for
:param str group: The group for the association
:param str partition: The cluster partition for the association
:param int fairshare: Optional fairshare for this association
:param int max_cpu: Optional maximum cores for this association
:param str max_mem: Optional maximum memory for this association
:param str qos: Optional qos for this association
:param bool ssh: Use ssh to run command on the remote scheduler
"""
arglist = [
'sacctmgr', 'add', 'user', user, '-i',
'Account={0}'.format(group),
'Partition={0}'.format(partition),
]
if fairshare is not None:
arglist.append('Share={0}'.format(fairshare))
if qos is not None:
arglist.append('QOS={0}'.format(qos))
tres = []
if max_cpu is not None:
tres.append('cpu={0}'.format(max_cpu))
if max_mem is not None:
tres.append('mem={0}'.format(max_mem))
if tres:
arglist.append('GrpTRES="{0}"'.format(','.join(tres)))
run_slurm_command(arglist, ssh=ssh)
[docs]def delete_slurm_association(user, group, partition, ssh=False):
"""
Delete a slurm association for the given user, group, partition
:param str user: ACCRE user (VUNetID) for the association
:param str group: The group for the association
:param str partition: The cluster partition for the association
:param bool ssh: Use ssh to run command on the remote scheduler
"""
arglist = [
'sacctmgr', 'delete', 'user', user, '-i',
'Account={0}'.format(group),
'Partition={0}'.format(partition),
]
run_slurm_command(arglist, ssh=ssh)
[docs]def set_default_group(user, group, ssh=False):
"""
Set the default group for a user in slurm
:param str user: ACCRE user (VUNetID)
:param str group: The desired default group
:param bool ssh: Use ssh to run command on the remote scheduler
"""
arglist = [
'sacctmgr', 'modify', 'user', '-i',
'name={0}'.format(user), 'set',
'defaultaccount={0}'.format(group)
]
run_slurm_command(arglist, ssh=ssh)
[docs]def get_slurm_users(ssh=False):
"""
Return a list of dicts each with information about a slurm user
including the name, default account (default), and administrative
access (admin).
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: list of user information dicts
:rtype: list(dict)
"""
arglist = ['sacctmgr', 'show', 'users', '-p']
output = run_slurm_command(arglist, ssh=ssh)
fields = [f.strip().lower() for f in output.splitlines()[0].split('|')]
result = []
for line in output.splitlines()[1:]:
user_fields = [f.strip() for f in line.split('|')]
user = {}
user['name'] = user_fields[fields.index('user')]
user['default'] = user_fields[fields.index('def acct')]
user['admin'] = False
if user_fields[fields.index('admin')] == 'Administrator':
user['admin'] = True
result.append(user)
return result
[docs]def groups_by_account(ssh=False):
"""
Return a dictionary keyed by ACCRE slurm accounts with a list of
slurm groups for each account.
:param bool ssh: Use ssh to run command on the remote scheduler
:returns: All groups for each account
:rtype: dict(str, list(str))
"""
arglist = ['sacctmgr', 'show', 'account', '-p']
output = run_slurm_command(arglist, ssh=ssh)
fields = [f.strip().lower() for f in output.splitlines()[0].split('|')]
result = {}
for line in output.splitlines()[1:]:
assoc_fields = [f.strip() for f in line.split('|')]
org = assoc_fields[fields.index('org')]
acct = assoc_fields[fields.index('account')]
if org not in result:
result[org] = []
if not acct.endswith('_account'):
result[org].append(acct)
return result
[docs]def get_sdiag_infor(options, ssh=False):
"""
Return the sdiag output information from slurm. The options is passed outside
:param options: options for the sdiag, in form of list of string
:returns: raw output from sdiag
"""
arglist = ['sdiag'] + options
output = run_slurm_command(arglist, ssh=ssh)
return output.strip()
[docs]def get_sacctmgr_status(options, ssh=False):
"""
Return the sacctmgr status checkout information from slurm. The options is passed outside
:param options: options for the sacctmgr check, in form of list of string
:returns: raw output from sacctmgr status check
"""
arglist = ['sacctmgr', 'show', 'stats'] + options
output = run_slurm_command(arglist, ssh=ssh)
return output.strip()
[docs]def get_sacct_infor(options, ssh=False):
"""
Return the sacct command output information from slurm. The options is passed outside
:param options: options for running sacct command, in form of list of string
:returns: raw output from sacct output
"""
arglist = ['sacct'] + options
output = run_slurm_command(arglist, ssh=ssh, timeout=240)
# split the raw output into lines and return
dataLines = output.splitlines()
return dataLines
[docs]def get_squeue_infor(options, ssh=False):
"""
Return the squeue command output information from slurm. The options is passed outside
:param options: options for running squeue command, in form of list of string
:returns: raw output from squeue output
"""
arglist = ['squeue'] + options
output = run_slurm_command(arglist, ssh=ssh)
# split the raw output into lines and return
dataLines = output.splitlines()
return dataLines
[docs]def get_nodes_infor(options, ssh=False):
"""
Run the sinfo command so that to get the nodes and cores information for the given
partition, such information can be used for drawing cluster ultilization graph.
Here we passed the general form of option from outside, just like the function of
get_sacct_infor so that it's flexible for the user to define the specific data
fields.
:param options: options passed to the sinfo command to get the nodes/cores information
:returns: the given nodes/cores information from sinfo
"""
arglist = ['sinfo'] + options
output = run_slurm_command(arglist, ssh=ssh)
# split the raw output into lines and return
dataLines = output.splitlines()
return dataLines
[docs]def slurm_version():
"""
This function returns slurm current version
"""
# use squeue command to simply get the slurm version information
arglist = ['squeue', "--version"]
output = run_slurm_command(arglist)
# get the slurm version and return it
s = output.strip().split()
if len(s)<=1:
raise ACCRESlurmError(
'Error running slurm command to get slurm version {0}: {1}'.format(arglist, output)
)
ver = s[1]
return ver
def _fetch_scontrol_show(nodes, ssh=False):
"""
Fetch info from scontrol for the specified nodes, parse and return
as a list
"""
arglist = ['scontrol', 'show', 'nodes', nodes, '-a', '-d', '-o']
output = run_slurm_command(arglist, ssh=ssh)
raw = output.splitlines()
nodeinfo = []
for line in raw:
keys = [s.strip(' =') for s in SLURM_INFO_RE.split(line)[1::2]]
vals = SLURM_INFO_RE.split(line)[2::2]
nodeinfo.append(dict(zip(keys, vals)))
return nodeinfo
def _get_nodes_by_state(states, ssh=False, hidden=False):
"""
Return a slurm-formatted string representing a list of
all nodes that have the given states.
"""
statestr = '--states=' + ','.join(states)
arglist = ['sinfo', statestr, '--format=%N', '--noheader']
if hidden:
arglist.append('-a')
output = run_slurm_command(arglist, ssh=ssh)
return output.strip()
[docs]def run_slurm_command(arglist, ssh=False, timeout=60):
"""
Run a specified slurm command with arguments and return
standard output decoded to UTF-8.
If ssh is set to True, ssh to the configured slurm server to
run the command. This requires either that non-interactive
authentication is set up or that this is run interactively.
:param list(str) arglist: List of slurm command and arguments.
Note that this will not be interpreted by a shell.
:param bool ssh: Use ssh to run command on the remote scheduler.
:param int timeout: set the timeout to this many seconds
(default 60)
:returns: Output of the command decoded to utf-8
:rtype: str
"""
if ssh:
arglist[0:0] = ['ssh', CONFIG['slurm']['scheduler']]
proc = subprocess.Popen(
arglist,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL
)
stdout, stderr = proc.communicate(timeout=timeout)
if proc.returncode != 0:
raise ACCRESlurmError(
'Error running slurm command {0}: {1}'.format(arglist, stderr)
)
return stdout.decode('utf-8')
# Old scripts have imported run_slurm_command with a leading underscore
# when it was a private function to this module (tsk tsk), so to keep
# compatibility we define it alternately with the underscore
_run_slurm_command = run_slurm_command