"""
Mixin class for databse client with GPFS related
functions
"""
from datetime import datetime
from accre.database.model import *
from accre.exceptions import ACCREValueError
[docs]class DBClientGPFSMixin:
"""
Functionality related to GPFS
"""
[docs] def update_gpfs_long_waiter(self, *, hostname, last_check,
length=None,
reason=None,
node=None
):
"""
Insert or update a GPFS long waiter record for a given hostname
:param str hostname: Internal hostname of the server (required)
:param datetime.datetime last_check: Last time this node was
checked for long waiters (required)
:param float length: Length of the longest waiter in seconds
:param str reason: Reason for the waiter
:param str node: IP address of the node the waiter is for
"""
record_values = {
'hostname': hostname,
'last_check': last_check,
'length': length,
'reason': reason,
'node': node
}
with self.engine.connect() as conn:
s = select([GPFS_LONG_WAITERS.c.hostname]).where(
hostname == GPFS_LONG_WAITERS.c.hostname
)
result = list(conn.execute(s))
if result:
up = (GPFS_LONG_WAITERS.update()
.where(GPFS_LONG_WAITERS.c.hostname == hostname)
.values(**record_values)
)
conn.execute(up)
else:
ins = GPFS_LONG_WAITERS.insert().values(**record_values)
conn.execute(ins)
[docs] def get_gpfs_longest_waiters(self, since=None):
"""
Retrieve the longest waiter for each host in the database where
the longest waiter has a set length sorted in decreasing order
by length.
If ``since`` is set to a datetime, only retrieve hosts
that have been updated since that datetime.
:param datetime.datetime since: Earliest update time for which
to retrieve managed host records
:returns: Managed inventory information for all hosts with a
valid waiter (length is not null)
:rtype: list(dict)
"""
if since is None:
since = datetime(1900, 1, 1)
with self.engine.connect() as conn:
s = select([GPFS_LONG_WAITERS]).where(
GPFS_LONG_WAITERS.c.last_check > since
)
waiters = [dict(r) for r in conn.execute(s)]
waiters = [r for r in waiters if r['length'] is not None]
waiters.sort(reverse=True, key=lambda i: i['length'])
return waiters
[docs] def update_gpfs_usage(self, *, filesystem, name, fileset,
type,
block_usage, block_quota, block_limit, block_grace,
file_usage, file_quota, file_limit, file_grace
):
"""
Insert or update a GPFS usage record for a given
(filesystem, name, fileset) identifier.
:param str filesystem: GPFS filesystem record refers to
:param str name: Fileset, user, or group identifier
:param str fileset: Parent or associated fileset
:param str type: Either FILESET, USR, or GRP
:param int block_usage: usage in bytes
:param int block_quota: (soft) quota in bytes
:param int block_limit: (hard quota) limit in bytes
:param string block_grace: Grace period for size quota, could
be none, expired, or a GPFS-format time value
:param int file_usage: usage in number of files
:param int file_quota: (soft) quota in number of files
:param int file_limit: (hard quota) limit in number of files
:param string file_grace: Grace period for file count quota, could
be none, expired, or a GPFS-format time value
"""
record_values = {
'filesystem': filesystem,
'name': name,
'fileset': fileset,
'type': type,
'last_check': datetime.now(),
'block_usage': block_usage,
'block_quota': block_quota,
'block_limit': block_limit,
'block_grace': block_grace,
'file_usage': file_usage,
'file_quota': file_quota,
'file_limit': file_limit,
'file_grace': file_grace,
}
with self.engine.connect() as conn:
s = select([
GPFS_USAGE.c.filesystem,
GPFS_USAGE.c.name,
GPFS_USAGE.c.fileset
]).where(
(filesystem == GPFS_USAGE.c.filesystem) &
(name == GPFS_USAGE.c.name) &
(fileset == GPFS_USAGE.c.fileset)
)
result = list(conn.execute(s))
if result:
up = (GPFS_USAGE.update()
.where(
(filesystem == GPFS_USAGE.c.filesystem) &
(name == GPFS_USAGE.c.name) &
(fileset == GPFS_USAGE.c.fileset)
)
.values(**record_values)
)
conn.execute(up)
else:
ins = GPFS_USAGE.insert().values(**record_values)
conn.execute(ins)
[docs] def get_gpfs_usage_records(self, *, filesystem, type,
name=None, fileset=None
):
"""
Retrieve all GPFS usage records for the given filesystem,
and type for all filesets and names or for a specific fileset
and/or name if specified.
:param str filesystem: The GPFS filesystem to query
:param str type: Either USR, GRP, or FILESET
:param str name: Record name if specified (None for any name)
:param str fileset: Fileset name if specified (None for any fileset)
:returns: Usage and quota information for all fileset records
specified.
:rtype: list(dict)
"""
if name is None and fileset is None:
criteria = (
(filesystem == GPFS_USAGE.c.filesystem) &
(type == GPFS_USAGE.c.type)
)
elif fileset is None:
criteria = (
(filesystem == GPFS_USAGE.c.filesystem) &
(type == GPFS_USAGE.c.type) &
(name == GPFS_USAGE.c.name)
)
elif name is None:
criteria = (
(filesystem == GPFS_USAGE.c.filesystem) &
(type == GPFS_USAGE.c.type) &
(fileset == GPFS_USAGE.c.fileset)
)
else:
criteria = (
(filesystem == GPFS_USAGE.c.filesystem) &
(type == GPFS_USAGE.c.type) &
(fileset == GPFS_USAGE.c.fileset) &
(name == GPFS_USAGE.c.name)
)
with self.engine.connect() as conn:
s = select([GPFS_USAGE]).where(criteria)
return [dict(r) for r in conn.execute(s)]
[docs] def add_gpfs_fileset(self, *, filesystem, name,
block_quota, block_limit, file_quota, file_limit,
gid, path, user_path,
active=True,
join_date=None,
fileset='root'
):
"""
Add a new GPFS fileset record determined by a
(filesystem, name, fileset) identifier.
Note that generally in the current (summer 2020)
setup the "fileset" will be "root". These records are
intended to track filesets owned by ACCRE groups
on /data or /dors, not anything relating to user home
or scratch quotas.
:param str filesystem: GPFS filesystem record refers to
:param str name: Fileset identifier
:param str fileset: Parent fileset, typically root
:param int block_quota: (soft) quota in bytes
:param int block_limit: (hard quota) limit in bytes
:param int file_quota: (soft) quota in number of files
:param int file_limit: (hard quota) limit in number of files
:param int gid: Numeric GID of the group that owns the fileset
:param str path: The path that the fileset is linked to in GPFS
:param str user_path: The path that the user should access the
fileset by, either via symlink, bind mount, or other method
:param bool active: If this fileset should still exist on the cluster
:param datetime.datetime join_date: Date the filesystem added to the
cluster, defaults to the current date
"""
if join_date is None:
join_date = datetime.now()
with self.engine.begin() as conn:
ins = GPFS_FILESETS.insert().values(
filesystem=filesystem,
name=name,
fileset=fileset,
block_quota=block_quota,
block_limit=block_limit,
file_quota=file_quota,
file_limit=file_limit,
gid=gid,
path=path,
user_path=user_path,
active=active,
join_date=join_date
)
conn.execute(ins)
[docs] def gpfs_fileset_info(self, *, filesystem, name, fileset='root',
usage=False
):
"""
Retrieve general information for a GPFS fileset. If usage is set
to True, also look up most recent usage record for the fileset and
give the time of the last usage check.
Usage will be set to None if the there is no usage record available
in the database
:param str filesystem: GPFS filesystem record refers to
:param str name: Fileset identifier
:param str fileset: Parent fileset, typically root
:returns: General fileset information
:rtype: dict
"""
with self.engine.connect() as conn:
s = select([GPFS_FILESETS]).where(
(GPFS_FILESETS.c.filesystem == filesystem) &
(GPFS_FILESETS.c.name == name) &
(GPFS_FILESETS.c.fileset == fileset)
)
fs = conn.execute(s).fetchone()
result = dict(fs)
result['group_name'] = ''
gid = result['gid']
s = select([GROUPS]).where(GROUPS.c.group_id == gid)
group = conn.execute(s).fetchone()
if group is not None:
result['group_name'] = group['name']
if usage:
result['block_usage'] = None
result['file_usage'] = None
result['last_usage_check'] = None
s = select([GPFS_USAGE]).where(
(GPFS_USAGE.c.filesystem == result['filesystem']) &
(GPFS_USAGE.c.name == result['name']) &
(GPFS_USAGE.c.fileset == result['fileset'])
)
usage = conn.execute(s).fetchone()
if usage is not None:
result['block_usage'] = usage['block_usage']
result['file_usage'] = usage['file_usage']
result['last_usage_check'] = usage['last_check']
return result
[docs] def all_gpfs_fileset_info(self, active=True):
"""
Retrieve general information about all active filesets,
or optionall all filesets in the database
:param bool active: GPFS filesystem record refers to
:returns: General fileset information for all filesets
:rtype: list(dict)
"""
with self.engine.connect() as conn:
if active:
s = select([GPFS_FILESETS]).where(
GPFS_FILESETS.c.active == True
)
else:
s = select([GPFS_FILESETS])
fs = [dict(f) for f in conn.execute(s).fetchall()]
return fs
[docs] def modify_gpfs_fileset(self, *, filesystem, name, fileset='root',
**updates
):
"""
Modify one or more fields of an existing GPFS fileset.
Note that this method does not log the change. If the associated
group exists, it is suggested to add a log entry to that group.
:param str filesystem: GPFS filesystem record refers to
:param str name: Fileset identifier
:param str fileset: Parent fileset, typically root
:param dict updates: Columns to be updated with their values
"""
invalid_fields = [
c for c in updates if c not in GPFS_FILESETS.columns
]
if invalid_fields:
raise ACCREValueError(
'Invalid account fields for update: {}'.format(invalid_fields)
)
with self.engine.begin() as conn:
up = GPFS_FILESETS.update().values(**updates).where(
(GPFS_FILESETS.c.filesystem == filesystem) &
(GPFS_FILESETS.c.name == name) &
(GPFS_FILESETS.c.fileset == fileset)
)
conn.execute(up)