"""
Wrappers to call GPFS commands
"""
import re
import subprocess
from accre.exceptions import ACCREError
from accre.config import get_config
from accre.util import convert_byte_unit
CONFIG = get_config()
[docs]class GPFSCommandError(ACCREError):
"""An error occurred running a GPFS command"""
[docs]def get_filesystem_info(filesystem=None):
"""
Runs mmlsfs command, which returns various attributes of the filesystem
being queried (execute "man mmlsfs" from any box with GPFS installed for
full details).
:param str filesystem: the GPFS filesystem to query
:returns: dictionary of filesystem attributes and their values
"""
if filesystem is None:
raise ValueError('filesystem must be specified')
proc = subprocess.Popen(
["/usr/lpp/mmfs/bin/mmlsfs", filesystem, "-Y"],
stdout=subprocess.PIPE
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise GPFSCommandError("{0}".format(stderr))
lines = stdout.decode("utf-8").splitlines()
return { line.split(":")[7]:line.split(":")[8] for line in lines[1:] }
[docs]def get_filesystem_quotas(quota_type, quota_target, filesystem=None):
"""
Runs mmlsquota command, which returns quota information for the specified
user, group, or fileset as appropriate (execute "man mmlsquota" from any box
with GPFS installed for full details).
:param str quota_type: -g for group, -j for fileset, -u for user
:param str quota_target: group, fileset, or user to be looked up
:param str filesystem (optional): only return info for specified fs
:returns: dict keyed by fileset where each value is a dict of quota information for that fileset
"""
if quota_type not in ('-g', '-j', '-u'):
raise ValueError('quota_type must be "-g" (group), "-j" (fileset), or "-u" (user)')
if filesystem is None:
raise ValueError('filesystem must be specified')
proc = subprocess.Popen(
["/usr/lpp/mmfs/bin/mmlsquota", quota_type, quota_target, "-Y", "-v", filesystem],
stdout=subprocess.PIPE
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise GPFSCommandError("{0}".format(stderr))
quota = {}
lines = stdout.decode("utf-8").splitlines()
for line in lines[1:]:
fields = line.split(":")
d = {}
d['blockUsage'] = int(fields[10])
d['blockQuota'] = int(fields[11])
d['blockLimit'] = int(fields[12])
d['blockGrace'] = fields[14]
d['filesUsage'] = int(fields[15])
d['filesQuota'] = int(fields[16])
d['filesLimit'] = int(fields[17])
d['filesGrace'] = fields[19]
if quota_type != "-j":
fileset = fields[22]
else:
fileset = fields[9]
if fileset == '':
fileset = "root"
quota[fileset] = d
return quota
[docs]def set_fileset_quota(
filesystem,
fileset,
block_quota,
block_limit,
file_quota=None,
file_limit=None,
ssh=False,
cluster='accre2'
):
"""
Set the soft block quota and limit for the specified fileset and
optionally set the file quota and limit
:param str filesystem: Filesystem that the fileset belongs to
:param str fileset: Name of the GPFS fileset
:param str block_quota: Soft block quota (should be in units of
k, M, G, T, etc..)
:param str block_limit: Hard block quota/limit (should be in units of
k, M, G, T, etc..)
:param str file_quota: Soft file quota, or no quota change if None
:param str file_limit: Soft file limit, or no quota change if None
:param bool ssh: If True, ssh out to the configured node running gpfs
to run the command. If False, run locally.
:param str cluster: GPFS cluster that contains the fileset, defaults
to accre2. You cannot set quotas on a remote cluster so the node
running this function or sshed to must be in the same cluster as
the filesystem.
"""
block_arg = f'{block_quota}:{block_limit}'
fs_arg = f'{filesystem}:{fileset}'
cmd = ['/usr/lpp/mmfs/bin/mmsetquota', fs_arg, '--block', block_arg]
if file_quota is not None and file_limit is not None:
file_arg = f'{file_quota}:{file_limit}'
cmd.extend(['--files', file_arg])
run_gpfs_command(cmd, ssh=ssh, cluster=cluster)
[docs]def get_gpfs_filesystems():
"""
Gets GPFS filesystems by reading /etc/fstab ... mount type (3rd field) will
be "gpfs" and the actual filesystem is determined by the "dev=" field in
the mount options.
:returns: two lists - local filesystems and remote filesystems
"""
local_fs = []
remote_fs = []
fstab = _read_fstab()
lines = (line.rstrip() for line in fstab.splitlines() if line.rstrip())
for line in lines:
fields = line.split()
if fields[0] == "#":
continue
if fields[2] == "gpfs":
mount_opts = fields[3].split(",")
for mount_opt in mount_opts:
if mount_opt[:4] == "dev=":
mount_device = mount_opt.split("=")[1]
if ':' not in mount_device:
local_fs.append(mount_device)
else:
remote_fs.append(mount_device)
return local_fs, remote_fs
def _read_fstab():
"""
Read /etc/fstab and return contents as a string
"""
with open('/etc/fstab') as stream:
return stream.read()
[docs]def run_mmlsnode(ssh=False, testcluster=False):
"""
Run the GPFS 'mmlsnode' command and return output as a dictionary
keyed by cluster with a list of nodes as the values.
:param bool ssh: If True, ssh out to the configured node running gpfs
to run the command. If False, run locally.
:param bool testcluster: If True and if ssh is also true, ssh out to
the configured node running GPFS on the test cluster.
:returns: GPFS nodes by cluster
:rtype: dict(str, list(str))
"""
raw = run_gpfs_command(
['/usr/lpp/mmfs/bin/mmlsnode'],
ssh=ssh, testcluster=testcluster
)
lines = raw.splitlines()
# strip off the first two lines which are headers
lines = lines[2:]
result = {}
for line in lines:
fields = line.split()
if len(fields) < 2:
continue
result[fields[0]] = fields[1:]
return result
[docs]def run_mmlslicense(ssh=False, testcluster=False):
"""
Run the GPFS 'mmlslicense' command and return output as a dictionary
of node counts per license type or missing license type.
:param bool ssh: If True, ssh out to the configured node running gpfs
to run the command. If False, run locally.
:param bool testcluster: If True and if ssh is also true, ssh out to
the configured node running GPFS on the test cluster.
:returns: GPFS license counts
:rtype: dict(str, int)
"""
raw = run_gpfs_command(
['/usr/lpp/mmfs/bin/mmlslicense'],
ssh=ssh, testcluster=testcluster
)
lines = raw.splitlines()
result = {}
for line in lines:
if not line.startswith('Number of nodes'):
continue
if 'defined in the cluster' in line:
key = 'defined'
elif 'with server license designation' in line:
key = 'server'
elif 'with FPO license designation' in line:
key = 'fpo'
elif 'with client license designation' in line:
key = 'client'
elif 'still requiring server license designation' in line:
key = 'missing_server'
elif 'still requiring client license designation' in line:
key = 'missing_client'
else:
continue
try:
fields = line.split(':')
value = int(fields[1].strip())
result[key] = value
except Exception:
raise GPFSCommandError(
"Could not parse mmlslicense command output line: {0}"
.format(line)
)
return result
[docs]def get_longest_waiter():
"""
Retrieve the length in seconds, reason, and node (if applicable) of the
longest waiter present on this node.
:returns: dictionary with fields length, reason, and node containing
information about the longest waiter on this node. If there are no
waiters all fields will be set to None.
"""
waiter = {'length': None, 'reason': None, 'node': None}
longest = 0
raw = run_gpfs_command(['/usr/lpp/mmfs/bin/mmdiag', '--waiters'])
for line in raw.splitlines():
m = re.match(r'Waiting ([0-9]+\.[0-9]+) sec', line)
try:
length = float(m.group(1))
except Exception:
continue
if length < longest:
continue
waiter['length'] = length
longest = length
r = re.search(r"reason '(.*)'", line)
if not r:
waiter['reason'] = None
else:
waiter['reason'] = r.group(1)
n = re.search(r'on node (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
if not n:
waiter['node'] = None
else:
waiter['node'] = n.group(1)
return waiter
[docs]def get_quota_usage(filesystem, quota_type, timeout=600):
"""
Return a list of quota/usage records for the specified GPFS filesytem
and quota type that is taken by parsing the result of the
mmrepquota command. This command should be run on a GPFS nsd or manager
node. The formatting of the result lists block usage in bytes and
is suitable for ingestion into the GFPS_USAGE table of the admin
database, see :mod:`accre.database`.
:param str filesystem: The GPFS filesystem to query
:param str quota_type: either USR, GRP, or FILESET
:param int timeout: Timeout in seconds to run the command, note
that it may take a while so the default is 10 minutes
:returns: list or quota/usage records
:rtype: list(dict)
"""
if quota_type == 'USR':
tflag = '-u'
elif quota_type == 'GRP':
tflag = '-g'
elif quota_type == 'FILESET':
tflag = '-j'
else:
raise ValueError(
f'Invalid quota type {quota_type}, must be USR, GRP, or FILESET'
)
args = [
'/usr/lpp/mmfs/bin/mmrepquota',
tflag,
filesystem,
'--block-size',
'auto'
]
raw = run_gpfs_command(args, timeout=timeout)
return parse_gpfs_usage(raw, filesystem=filesystem)
[docs]def parse_gpfs_usage(raw, filesystem=None):
"""
Takes the output of mmrepquota with --block-size set to auto and
returns a list of dictionaries of quota/usage records.
The formatting of the result lists block usage in bytes and
is suitable for ingestion into the GFPS_USAGE table of the admin
database, see :mod:`accre.database`.
:param str raw: Output of mmrepquota decoded as utf-8
:param str filesystem: Name of filesystem to insert into records
:returns: Quota usage records
:rtype: list(dict)
"""
lines = raw.splitlines()
header = lines[1]
body = lines[2:]
# Note that there is as of 5.0.3 no -Y option for the mmrepquota command
# so we check that the structure of the output matches the expected
# whitespace-delimited fields or throw an error
expected_header_fields = [
'Name', 'fileset', 'type', 'blocks', 'quota', 'limit',
'in_doubt', 'grace', '|', 'files', 'quota', 'limit', 'in_doubt',
'grace'
]
if header.split() != expected_header_fields:
raise GPFSCommandError(
'The fields returned by mmrepquota do not match the expected '
'first-line of the output, cannot safely parse output. Has '
'a new version of GPFS been installed recently?'
)
result = []
for line in body:
if not line:
continue
parts = line.split('|')
fields1 = parts[0].split()
fields2 = parts[1].split()
entry = {}
if filesystem is not None:
entry['filesystem'] = filesystem
entry['name'] = fields1[0]
entry['fileset'] = fields1[1]
entry['type'] = fields1[2]
entry['block_usage'] = int(convert_byte_unit(fields1[3], target='B'))
entry['block_quota'] = int(convert_byte_unit(fields1[4], target='B'))
entry['block_limit'] = int(convert_byte_unit(fields1[5], target='B'))
entry['block_grace'] = ' '.join(fields1[7:])
entry['file_usage'] = int(convert_byte_unit(fields2[0], target='B'))
entry['file_quota'] = int(convert_byte_unit(fields2[1], target='B'))
entry['file_limit'] = int(convert_byte_unit(fields2[2], target='B'))
entry['file_grace'] = ' '.join(fields2[4:])
result.append(entry)
return result
[docs]def parse_gpfs_y_command(raw):
"""
Parse the result of the raw output of GPFS cli command executed with
the -Y option into a list of dictionaries keyed according to the
header line in the output. 'reserved' fields are discarded.
The first three fields in each line of the raw output appear to be
special. The first is the name of the command and is set as a
"command" field. The second is the type of entity and will be set
to a "type" field. The third field is HEADER if the line is
defines all further fields for an entity or is 0 if it is a
regular output.
The HEADER entries are first parsed to determine the structure of
each entry and are not added to the return list.
Empty or reserved fields are discarded.
:param str raw: raw output of the GPFS command with -Y
:returns: List of dictionaries containing parsed output
:rtype: list(dict(str, str))
"""
raw_lines = raw.splitlines()
raw_header = raw_lines[0]
raw_body = raw_lines[1:]
# determine entity types in structure
types = {}
for line in raw_lines:
fields = line.split(':')
if len(fields) < 3 or fields[2] != 'HEADER':
continue
types[fields[1]] = fields[3:]
etype = types[fields[1]]
for idx, item in enumerate(etype):
if item == 'reserved' or not item:
etype[idx] = None
# Parse non-header entries
result = []
for line in raw_body:
item = {}
fields = line.split(':')
if len(fields) < 3 or fields[2] == 'HEADER':
continue
item['command'] = fields[0]
item['type'] = fields[1]
header = types[fields[1]]
for idx, value in enumerate(fields[3:]):
# We have seen situations where GPFS will add an extra stray
# colon to the end of a line, violating the format, so we
# throw away any additional fields, see RT66893
if idx < len(header) and header[idx]:
item[header[idx]] = value
result.append(item)
return result
[docs]def run_gpfs_command(
arglist,
ssh=False,
cluster=None,
testcluster=False,
timeout=60
):
"""
Runs a GPFS command as a subprocess and returns the standard
output as a string decoded from utf-8.
Optionally ssh out to a configured GPFS node and use a timeout.
:param list(str) arglist: List of arguments run, including command name
:param bool ssh: If True, ssh to the configured gpfs node to run command
:param str cluster: If True and if ssh is True, run on a configured node
in the specified cluster, (i.e. accre or accre2)
:param bool testcluster: If True and if ssh is True, run on a configured
test cluster node rather than the production cluster
:param int timeout: Maximum time in seconds to wait for command
to complete
:returns: Standard output of command interpreted as utf-8 text
:rtype: str
"""
if ssh:
if testcluster:
server = 'root@{0}'.format(CONFIG['gpfs']['testnode'])
elif cluster is not None:
server = 'root@{0}'.format(
CONFIG['gpfs'][f'{cluster}_cluster_node']
)
else:
server = 'root@{0}'.format(CONFIG['gpfs']['prodnode'])
ssh_args = ['ssh', server]
arglist = ssh_args + arglist
proc = subprocess.Popen(
arglist, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL
)
stdout, stderr = proc.communicate(timeout=timeout)
if proc.returncode != 0:
msg = (
'GPFS command failed with exit code {0}: {1}.'
.format(proc.returncode, stderr)
)
raise GPFSCommandError(msg)
return stdout.decode('utf-8')
# Some old scripts directly import _run_gpfs_command as it was initially
# a private function that should have been public. Keep this reference
# for compatibility
_run_gpfs_command = run_gpfs_command