Source code for accre.database.client.gpfs

"""
Mixin class for databse client with GPFS related
functions
"""
from datetime import datetime

from accre.database.model import *
from accre.exceptions import ACCREValueError


[docs]class DBClientGPFSMixin: """ Functionality related to GPFS """
[docs] def update_gpfs_long_waiter(self, *, hostname, last_check, length=None, reason=None, node=None ): """ Insert or update a GPFS long waiter record for a given hostname :param str hostname: Internal hostname of the server (required) :param datetime.datetime last_check: Last time this node was checked for long waiters (required) :param float length: Length of the longest waiter in seconds :param str reason: Reason for the waiter :param str node: IP address of the node the waiter is for """ record_values = { 'hostname': hostname, 'last_check': last_check, 'length': length, 'reason': reason, 'node': node } with self.engine.connect() as conn: s = select([GPFS_LONG_WAITERS.c.hostname]).where( hostname == GPFS_LONG_WAITERS.c.hostname ) result = list(conn.execute(s)) if result: up = (GPFS_LONG_WAITERS.update() .where(GPFS_LONG_WAITERS.c.hostname == hostname) .values(**record_values) ) conn.execute(up) else: ins = GPFS_LONG_WAITERS.insert().values(**record_values) conn.execute(ins)
[docs] def get_gpfs_longest_waiters(self, since=None): """ Retrieve the longest waiter for each host in the database where the longest waiter has a set length sorted in decreasing order by length. If ``since`` is set to a datetime, only retrieve hosts that have been updated since that datetime. :param datetime.datetime since: Earliest update time for which to retrieve managed host records :returns: Managed inventory information for all hosts with a valid waiter (length is not null) :rtype: list(dict) """ if since is None: since = datetime(1900, 1, 1) with self.engine.connect() as conn: s = select([GPFS_LONG_WAITERS]).where( GPFS_LONG_WAITERS.c.last_check > since ) waiters = [dict(r) for r in conn.execute(s)] waiters = [r for r in waiters if r['length'] is not None] waiters.sort(reverse=True, key=lambda i: i['length']) return waiters
[docs] def get_gpfs_quota_contacts(self, entity, etype, fallback=False): """ Retrieve all quota contact emails for the given entity of the given type (group, fileset). If there are no quota contact entries in the database, and a group exists with the same name as the entity, then the PI email for that group is returned in the list. :param str entity: Name of the group or fileset being queried :param str etype: Type of gpfs quota entity (fileset, group) :param bool fallback: If True, and a group with the same name as entity exists, return the PI email for that group if there are no quota contacts. :returns: List of quota contact emails for the entity :rtype: list(str) """ with self.engine.connect() as conn: s = select([GPFS_QUOTA_CONTACTS]).where( (GPFS_QUOTA_CONTACTS.c.entity == entity) & (GPFS_QUOTA_CONTACTS.c.etype == etype) ) result = [dict(r)['email'] for r in conn.execute(s)] if fallback and not result: s = select( [GROUPS, PIS.c.email, PIS.c.vunetid] ).where( (GROUPS.c.name == entity) & (GROUPS.c.pi == PIS.c.vunetid) ) group = conn.execute(s).fetchone() if group: result.append(dict(group)['email']) return result
[docs] def get_all_gpfs_quota_contacts(self): """ Retrieve all quota contact emails for all entities of all types (group, fileset, etc.) as a list of dictionaries, one for each entry in the quota contacts table. :returns: List of quota contact entries :rtype: list(dict(str,str)) """ with self.engine.connect() as conn: s = select([GPFS_QUOTA_CONTACTS]) return [dict(r) for r in conn.execute(s)]
[docs] def add_gpfs_quota_contact(self, entity, etype, email): """ Add the quota contact email for the given entity and type (group, fileset). :param str entity: Name of the group or fileset being queried :param str etype: Type of gpfs quota entity (fileset, group) :param str email: quota contact email address """ with self.engine.connect() as conn: ins = GPFS_QUOTA_CONTACTS.insert().values( entity=entity, etype=etype, email=email ) conn.execute(ins)
[docs] def remove_gpfs_quota_contact(self, entity, etype, email): """ Remove the quota contact email for the given entity and type (group, fileset). :param str entity: Name of the group or fileset being queried :param str etype: Type of gpfs quota entity (fileset, group) :param str email: quota contact email address """ with self.engine.begin() as conn: delete = GPFS_QUOTA_CONTACTS.delete().where( (GPFS_QUOTA_CONTACTS.c.entity == entity) & (GPFS_QUOTA_CONTACTS.c.etype == etype) & (GPFS_QUOTA_CONTACTS.c.email == email) ) conn.execute(delete)
[docs] def remove_all_gpfs_quota_contacts(self, entity, etype): """ Remove all quota contact emails for the given entity and type (group, fileset). :param str entity: Name of the group or fileset being queried :param str etype: Type of gpfs quota entity (fileset, group) """ with self.engine.begin() as conn: delete = GPFS_QUOTA_CONTACTS.delete().where( (GPFS_QUOTA_CONTACTS.c.entity == entity) & (GPFS_QUOTA_CONTACTS.c.etype == etype) ) conn.execute(delete)
[docs] def update_gpfs_usage(self, *, filesystem, name, fileset, type, block_usage, block_quota, block_limit, block_grace, file_usage, file_quota, file_limit, file_grace ): """ Insert or update a GPFS usage record for a given (filesystem, name, fileset) identifier. :param str filesystem: GPFS filesystem record refers to :param str name: Fileset, user, or group identifier :param str fileset: Parent or associated fileset :param str type: Either FILESET, USR, or GRP :param int block_usage: usage in bytes :param int block_quota: (soft) quota in bytes :param int block_limit: (hard quota) limit in bytes :param string block_grace: Grace period for size quota, could be none, expired, or a GPFS-format time value :param int file_usage: usage in number of files :param int file_quota: (soft) quota in number of files :param int file_limit: (hard quota) limit in number of files :param string file_grace: Grace period for file count quota, could be none, expired, or a GPFS-format time value """ record_values = { 'filesystem': filesystem, 'name': name, 'fileset': fileset, 'type': type, 'last_check': datetime.now(), 'block_usage': block_usage, 'block_quota': block_quota, 'block_limit': block_limit, 'block_grace': block_grace, 'file_usage': file_usage, 'file_quota': file_quota, 'file_limit': file_limit, 'file_grace': file_grace, } with self.engine.connect() as conn: s = select([ GPFS_USAGE.c.filesystem, GPFS_USAGE.c.name, GPFS_USAGE.c.fileset ]).where( (filesystem == GPFS_USAGE.c.filesystem) & (name == GPFS_USAGE.c.name) & (fileset == GPFS_USAGE.c.fileset) ) result = list(conn.execute(s)) if result: up = (GPFS_USAGE.update() .where( (filesystem == GPFS_USAGE.c.filesystem) & (name == GPFS_USAGE.c.name) & (fileset == GPFS_USAGE.c.fileset) ) .values(**record_values) ) conn.execute(up) else: ins = GPFS_USAGE.insert().values(**record_values) conn.execute(ins)
[docs] def get_gpfs_usage_records(self, *, filesystem, type, name=None, fileset=None ): """ Retrieve all GPFS usage records for the given filesystem, and type for all filesets and names or for a specific fileset and/or name if specified. :param str filesystem: The GPFS filesystem to query :param str type: Either USR, GRP, or FILESET :param str name: Record name if specified (None for any name) :param str fileset: Fileset name if specified (None for any fileset) :returns: Usage and quota information for all fileset records specified. :rtype: list(dict) """ if name is None and fileset is None: criteria = ( (filesystem == GPFS_USAGE.c.filesystem) & (type == GPFS_USAGE.c.type) ) elif fileset is None: criteria = ( (filesystem == GPFS_USAGE.c.filesystem) & (type == GPFS_USAGE.c.type) & (name == GPFS_USAGE.c.name) ) elif name is None: criteria = ( (filesystem == GPFS_USAGE.c.filesystem) & (type == GPFS_USAGE.c.type) & (fileset == GPFS_USAGE.c.fileset) ) else: criteria = ( (filesystem == GPFS_USAGE.c.filesystem) & (type == GPFS_USAGE.c.type) & (fileset == GPFS_USAGE.c.fileset) & (name == GPFS_USAGE.c.name) ) with self.engine.connect() as conn: s = select([GPFS_USAGE]).where(criteria) return [dict(r) for r in conn.execute(s)]
[docs] def add_gpfs_fileset(self, *, filesystem, name, block_quota, block_limit, file_quota, file_limit, gid, path, user_path, active=True, join_date=None, fileset='root' ): """ Add a new GPFS fileset record determined by a (filesystem, name, fileset) identifier. Note that generally in the current (summer 2020) setup the "fileset" will be "root". These records are intended to track filesets owned by ACCRE groups on /data or /dors, not anything relating to user home or scratch quotas. :param str filesystem: GPFS filesystem record refers to :param str name: Fileset identifier :param str fileset: Parent fileset, typically root :param int block_quota: (soft) quota in bytes :param int block_limit: (hard quota) limit in bytes :param int file_quota: (soft) quota in number of files :param int file_limit: (hard quota) limit in number of files :param int gid: Numeric GID of the group that owns the fileset :param str path: The path that the fileset is linked to in GPFS :param str user_path: The path that the user should access the fileset by, either via symlink, bind mount, or other method :param bool active: If this fileset should still exist on the cluster :param datetime.datetime join_date: Date the filesystem added to the cluster, defaults to the current date """ if join_date is None: join_date = datetime.now() with self.engine.begin() as conn: ins = GPFS_FILESETS.insert().values( filesystem=filesystem, name=name, fileset=fileset, block_quota=block_quota, block_limit=block_limit, file_quota=file_quota, file_limit=file_limit, gid=gid, path=path, user_path=user_path, active=active, join_date=join_date ) conn.execute(ins)
[docs] def gpfs_fileset_info(self, *, filesystem, name, fileset='root', usage=False ): """ Retrieve general information for a GPFS fileset. If usage is set to True, also look up most recent usage record for the fileset and give the time of the last usage check. Usage will be set to None if the there is no usage record available in the database :param str filesystem: GPFS filesystem record refers to :param str name: Fileset identifier :param str fileset: Parent fileset, typically root :returns: General fileset information :rtype: dict """ with self.engine.connect() as conn: s = select([GPFS_FILESETS]).where( (GPFS_FILESETS.c.filesystem == filesystem) & (GPFS_FILESETS.c.name == name) & (GPFS_FILESETS.c.fileset == fileset) ) fs = conn.execute(s).fetchone() result = dict(fs) result['group_name'] = '' gid = result['gid'] s = select([GROUPS]).where(GROUPS.c.group_id == gid) group = conn.execute(s).fetchone() if group is not None: result['group_name'] = group['name'] if usage: result['block_usage'] = None result['file_usage'] = None result['last_usage_check'] = None s = select([GPFS_USAGE]).where( (GPFS_USAGE.c.filesystem == result['filesystem']) & (GPFS_USAGE.c.name == result['name']) & (GPFS_USAGE.c.fileset == result['fileset']) ) usage = conn.execute(s).fetchone() if usage is not None: result['block_usage'] = usage['block_usage'] result['file_usage'] = usage['file_usage'] result['last_usage_check'] = usage['last_check'] return result
[docs] def all_gpfs_fileset_info(self, active=True): """ Retrieve general information about all active filesets, or optionall all filesets in the database :param bool active: GPFS filesystem record refers to :returns: General fileset information for all filesets :rtype: list(dict) """ with self.engine.connect() as conn: if active: s = select([GPFS_FILESETS]).where( GPFS_FILESETS.c.active == True ) else: s = select([GPFS_FILESETS]) fs = [dict(f) for f in conn.execute(s).fetchall()] return fs
[docs] def modify_gpfs_fileset(self, *, filesystem, name, fileset='root', **updates ): """ Modify one or more fields of an existing GPFS fileset. Note that this method does not log the change. If the associated group exists, it is suggested to add a log entry to that group. :param str filesystem: GPFS filesystem record refers to :param str name: Fileset identifier :param str fileset: Parent fileset, typically root :param dict updates: Columns to be updated with their values """ invalid_fields = [ c for c in updates if c not in GPFS_FILESETS.columns ] if invalid_fields: raise ACCREValueError( 'Invalid account fields for update: {}'.format(invalid_fields) ) with self.engine.begin() as conn: up = GPFS_FILESETS.update().values(**updates).where( (GPFS_FILESETS.c.filesystem == filesystem) & (GPFS_FILESETS.c.name == name) & (GPFS_FILESETS.c.fileset == fileset) ) conn.execute(up)