Source code for accre.gpfs

"""
Wrappers to call GPFS commands
"""
import re
import subprocess

from accre.exceptions import ACCREError
from accre.config import get_config
from accre.util import convert_byte_unit


CONFIG = get_config()


[docs]class GPFSCommandError(ACCREError): """An error occurred running a GPFS command"""
[docs]def get_filesystem_info(filesystem=None): """ Runs mmlsfs command, which returns various attributes of the filesystem being queried (execute "man mmlsfs" from any box with GPFS installed for full details). :param str filesystem: the GPFS filesystem to query :returns: dictionary of filesystem attributes and their values """ if filesystem is None: raise ValueError('filesystem must be specified') proc = subprocess.Popen( ["/usr/lpp/mmfs/bin/mmlsfs", filesystem, "-Y"], stdout=subprocess.PIPE ) stdout, stderr = proc.communicate() if proc.returncode != 0: raise GPFSCommandError("{0}".format(stderr)) lines = stdout.decode("utf-8").splitlines() return { line.split(":")[7]:line.split(":")[8] for line in lines[1:] }
[docs]def get_filesystem_quotas(quota_type, quota_target, filesystem=None): """ Runs mmlsquota command, which returns quota information for the specified user, group, or fileset as appropriate (execute "man mmlsquota" from any box with GPFS installed for full details). :param str quota_type: -g for group, -j for fileset, -u for user :param str quota_target: group, fileset, or user to be looked up :param str filesystem (optional): only return info for specified fs :returns: dict keyed by fileset where each value is a dict of quota information for that fileset """ if quota_type not in ('-g', '-j', '-u'): raise ValueError('quota_type must be "-g" (group), "-j" (fileset), or "-u" (user)') if filesystem is None: raise ValueError('filesystem must be specified') proc = subprocess.Popen( ["/usr/lpp/mmfs/bin/mmlsquota", quota_type, quota_target, "-Y", "-v", filesystem], stdout=subprocess.PIPE ) stdout, stderr = proc.communicate() if proc.returncode != 0: raise GPFSCommandError("{0}".format(stderr)) quota = {} lines = stdout.decode("utf-8").splitlines() for line in lines[1:]: fields = line.split(":") d = {} d['blockUsage'] = int(fields[10]) d['blockQuota'] = int(fields[11]) d['blockLimit'] = int(fields[12]) d['blockGrace'] = fields[14] d['filesUsage'] = int(fields[15]) d['filesQuota'] = int(fields[16]) d['filesLimit'] = int(fields[17]) d['filesGrace'] = fields[19] if quota_type != "-j": fileset = fields[22] else: fileset = fields[9] if fileset == '': fileset = "root" quota[fileset] = d return quota
[docs]def set_fileset_quota( filesystem, fileset, block_quota, block_limit, file_quota=None, file_limit=None, ssh=False, cluster='accre2' ): """ Set the soft block quota and limit for the specified fileset and optionally set the file quota and limit :param str filesystem: Filesystem that the fileset belongs to :param str fileset: Name of the GPFS fileset :param str block_quota: Soft block quota (should be in units of k, M, G, T, etc..) :param str block_limit: Hard block quota/limit (should be in units of k, M, G, T, etc..) :param str file_quota: Soft file quota, or no quota change if None :param str file_limit: Soft file limit, or no quota change if None :param bool ssh: If True, ssh out to the configured node running gpfs to run the command. If False, run locally. :param str cluster: GPFS cluster that contains the fileset, defaults to accre2. You cannot set quotas on a remote cluster so the node running this function or sshed to must be in the same cluster as the filesystem. """ block_arg = f'{block_quota}:{block_limit}' fs_arg = f'{filesystem}:{fileset}' cmd = ['/usr/lpp/mmfs/bin/mmsetquota', fs_arg, '--block', block_arg] if file_quota is not None and file_limit is not None: file_arg = f'{file_quota}:{file_limit}' cmd.extend(['--files', file_arg]) run_gpfs_command(cmd, ssh=ssh, cluster=cluster)
[docs]def get_gpfs_filesystems(): """ Gets GPFS filesystems by reading /etc/fstab ... mount type (3rd field) will be "gpfs" and the actual filesystem is determined by the "dev=" field in the mount options. :returns: two lists - local filesystems and remote filesystems """ local_fs = [] remote_fs = [] fstab = _read_fstab() lines = (line.rstrip() for line in fstab.splitlines() if line.rstrip()) for line in lines: fields = line.split() if fields[0] == "#": continue if fields[2] == "gpfs": mount_opts = fields[3].split(",") for mount_opt in mount_opts: if mount_opt[:4] == "dev=": mount_device = mount_opt.split("=")[1] if ':' not in mount_device: local_fs.append(mount_device) else: remote_fs.append(mount_device) return local_fs, remote_fs
def _read_fstab(): """ Read /etc/fstab and return contents as a string """ with open('/etc/fstab') as stream: return stream.read()
[docs]def run_mmlsnode(ssh=False, testcluster=False): """ Run the GPFS 'mmlsnode' command and return output as a dictionary keyed by cluster with a list of nodes as the values. :param bool ssh: If True, ssh out to the configured node running gpfs to run the command. If False, run locally. :param bool testcluster: If True and if ssh is also true, ssh out to the configured node running GPFS on the test cluster. :returns: GPFS nodes by cluster :rtype: dict(str, list(str)) """ raw = run_gpfs_command( ['/usr/lpp/mmfs/bin/mmlsnode'], ssh=ssh, testcluster=testcluster ) lines = raw.splitlines() # strip off the first two lines which are headers lines = lines[2:] result = {} for line in lines: fields = line.split() if len(fields) < 2: continue result[fields[0]] = fields[1:] return result
[docs]def run_mmlslicense(ssh=False, testcluster=False): """ Run the GPFS 'mmlslicense' command and return output as a dictionary of node counts per license type or missing license type. :param bool ssh: If True, ssh out to the configured node running gpfs to run the command. If False, run locally. :param bool testcluster: If True and if ssh is also true, ssh out to the configured node running GPFS on the test cluster. :returns: GPFS license counts :rtype: dict(str, int) """ raw = run_gpfs_command( ['/usr/lpp/mmfs/bin/mmlslicense'], ssh=ssh, testcluster=testcluster ) lines = raw.splitlines() result = {} for line in lines: if not line.startswith('Number of nodes'): continue if 'defined in the cluster' in line: key = 'defined' elif 'with server license designation' in line: key = 'server' elif 'with FPO license designation' in line: key = 'fpo' elif 'with client license designation' in line: key = 'client' elif 'still requiring server license designation' in line: key = 'missing_server' elif 'still requiring client license designation' in line: key = 'missing_client' else: continue try: fields = line.split(':') value = int(fields[1].strip()) result[key] = value except Exception: raise GPFSCommandError( "Could not parse mmlslicense command output line: {0}" .format(line) ) return result
[docs]def get_longest_waiter(): """ Retrieve the length in seconds, reason, and node (if applicable) of the longest waiter present on this node. :returns: dictionary with fields length, reason, and node containing information about the longest waiter on this node. If there are no waiters all fields will be set to None. """ waiter = {'length': None, 'reason': None, 'node': None} longest = 0 raw = run_gpfs_command(['/usr/lpp/mmfs/bin/mmdiag', '--waiters']) for line in raw.splitlines(): m = re.match(r'Waiting ([0-9]+\.[0-9]+) sec', line) try: length = float(m.group(1)) except Exception: continue if length < longest: continue waiter['length'] = length longest = length r = re.search(r"reason '(.*)'", line) if not r: waiter['reason'] = None else: waiter['reason'] = r.group(1) n = re.search(r'on node (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line) if not n: waiter['node'] = None else: waiter['node'] = n.group(1) return waiter
[docs]def get_quota_usage(filesystem, quota_type, timeout=600): """ Return a list of quota/usage records for the specified GPFS filesytem and quota type that is taken by parsing the result of the mmrepquota command. This command should be run on a GPFS nsd or manager node. The formatting of the result lists block usage in bytes and is suitable for ingestion into the GFPS_USAGE table of the admin database, see :mod:`accre.database`. :param str filesystem: The GPFS filesystem to query :param str quota_type: either USR, GRP, or FILESET :param int timeout: Timeout in seconds to run the command, note that it may take a while so the default is 10 minutes :returns: list or quota/usage records :rtype: list(dict) """ if quota_type == 'USR': tflag = '-u' elif quota_type == 'GRP': tflag = '-g' elif quota_type == 'FILESET': tflag = '-j' else: raise ValueError( f'Invalid quota type {quota_type}, must be USR, GRP, or FILESET' ) args = [ '/usr/lpp/mmfs/bin/mmrepquota', tflag, filesystem, '--block-size', 'auto' ] raw = run_gpfs_command(args, timeout=timeout) return parse_gpfs_usage(raw, filesystem=filesystem)
[docs]def parse_gpfs_usage(raw, filesystem=None): """ Takes the output of mmrepquota with --block-size set to auto and returns a list of dictionaries of quota/usage records. The formatting of the result lists block usage in bytes and is suitable for ingestion into the GFPS_USAGE table of the admin database, see :mod:`accre.database`. :param str raw: Output of mmrepquota decoded as utf-8 :param str filesystem: Name of filesystem to insert into records :returns: Quota usage records :rtype: list(dict) """ lines = raw.splitlines() header = lines[1] body = lines[2:] # Note that there is as of 5.0.3 no -Y option for the mmrepquota command # so we check that the structure of the output matches the expected # whitespace-delimited fields or throw an error expected_header_fields = [ 'Name', 'fileset', 'type', 'blocks', 'quota', 'limit', 'in_doubt', 'grace', '|', 'files', 'quota', 'limit', 'in_doubt', 'grace' ] if header.split() != expected_header_fields: raise GPFSCommandError( 'The fields returned by mmrepquota do not match the expected ' 'first-line of the output, cannot safely parse output. Has ' 'a new version of GPFS been installed recently?' ) result = [] for line in body: if not line: continue parts = line.split('|') fields1 = parts[0].split() fields2 = parts[1].split() entry = {} if filesystem is not None: entry['filesystem'] = filesystem entry['name'] = fields1[0] entry['fileset'] = fields1[1] entry['type'] = fields1[2] entry['block_usage'] = int(convert_byte_unit(fields1[3], target='B')) entry['block_quota'] = int(convert_byte_unit(fields1[4], target='B')) entry['block_limit'] = int(convert_byte_unit(fields1[5], target='B')) entry['block_grace'] = ' '.join(fields1[7:]) entry['file_usage'] = int(convert_byte_unit(fields2[0], target='B')) entry['file_quota'] = int(convert_byte_unit(fields2[1], target='B')) entry['file_limit'] = int(convert_byte_unit(fields2[2], target='B')) entry['file_grace'] = ' '.join(fields2[4:]) result.append(entry) return result
[docs]def parse_gpfs_y_command(raw): """ Parse the result of the raw output of GPFS cli command executed with the -Y option into a list of dictionaries keyed according to the header line in the output. 'reserved' fields are discarded. The first three fields in each line of the raw output appear to be special. The first is the name of the command and is set as a "command" field. The second is the type of entity and will be set to a "type" field. The third field is HEADER if the line is defines all further fields for an entity or is 0 if it is a regular output. The HEADER entries are first parsed to determine the structure of each entry and are not added to the return list. Empty or reserved fields are discarded. :param str raw: raw output of the GPFS command with -Y :returns: List of dictionaries containing parsed output :rtype: list(dict(str, str)) """ raw_lines = raw.splitlines() raw_header = raw_lines[0] raw_body = raw_lines[1:] # determine entity types in structure types = {} for line in raw_lines: fields = line.split(':') if len(fields) < 3 or fields[2] != 'HEADER': continue types[fields[1]] = fields[3:] etype = types[fields[1]] for idx, item in enumerate(etype): if item == 'reserved' or not item: etype[idx] = None # Parse non-header entries result = [] for line in raw_body: item = {} fields = line.split(':') if len(fields) < 3 or fields[2] == 'HEADER': continue item['command'] = fields[0] item['type'] = fields[1] header = types[fields[1]] for idx, value in enumerate(fields[3:]): # We have seen situations where GPFS will add an extra stray # colon to the end of a line, violating the format, so we # throw away any additional fields, see RT66893 if idx < len(header) and header[idx]: item[header[idx]] = value result.append(item) return result
[docs]def run_gpfs_command( arglist, ssh=False, cluster=None, testcluster=False, timeout=60 ): """ Runs a GPFS command as a subprocess and returns the standard output as a string decoded from utf-8. Optionally ssh out to a configured GPFS node and use a timeout. :param list(str) arglist: List of arguments run, including command name :param bool ssh: If True, ssh to the configured gpfs node to run command :param str cluster: If True and if ssh is True, run on a configured node in the specified cluster, (i.e. accre or accre2) :param bool testcluster: If True and if ssh is True, run on a configured test cluster node rather than the production cluster :param int timeout: Maximum time in seconds to wait for command to complete :returns: Standard output of command interpreted as utf-8 text :rtype: str """ if ssh: if testcluster: server = 'root@{0}'.format(CONFIG['gpfs']['testnode']) elif cluster is not None: server = 'root@{0}'.format( CONFIG['gpfs'][f'{cluster}_cluster_node'] ) else: server = 'root@{0}'.format(CONFIG['gpfs']['prodnode']) ssh_args = ['ssh', server] arglist = ssh_args + arglist proc = subprocess.Popen( arglist, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL ) stdout, stderr = proc.communicate(timeout=timeout) if proc.returncode != 0: msg = ( 'GPFS command failed with exit code {0}: {1}.' .format(proc.returncode, stderr) ) raise GPFSCommandError(msg) return stdout.decode('utf-8')
# Some old scripts directly import _run_gpfs_command as it was initially # a private function that should have been public. Keep this reference # for compatibility _run_gpfs_command = run_gpfs_command