Source code for accre.filesystem.panfs.main

"""
Main module for the panfs functionality
"""

import subprocess
import grp
from datetime import datetime

from accre.database.model import PANFS_VOLUMES, PANFS_USAGE
from accre.util import convert_byte_unit
from accre.logger import get_logger

from accre.filesystem.symlinks import (
    create_or_update_symlink,
    delete_symlink as del_symlink
)
from accre.filesystem.basefilesystem import GenericFileSystemMixin

from .constants import (
    COULDNT_CREATE_SYMLINK_MSG,
    DEFAULT_MOUNT_POINT,
    PanfsCommandError,
    PanasasVolume,
    PanasasVolumePartial,
)
from .utils import (
    remove_slash,
    get_key_value_from_raw,
    run_panfs_command,
    parse_volume_list,
    get_group_from_volume_name,
    parse_panfs_userquota_output
)

logger = get_logger()

[docs]class PanasasFileSystem(GenericFileSystemMixin): """ FileSystem client for PanFS """ database_table = PANFS_VOLUMES usage_table = PANFS_USAGE data_model = PanasasVolume data_model_partial = PanasasVolumePartial # Make use of this if the volume doesn't match correctly with the group name mis_matched_volume_groups = {}
[docs] def process_fields_db(self, validated_volume_input, insert=False) -> dict: """ Process the fields so that they can be inserted into the database for example: if hard_quota is 7 TB, then convert it to bytes and return that value :param PanasasVolume validated_volume_input: The validated input for the volume. :param bool insert: If True, the input is for an insert operation. If False, the input is for an update operation. :returns: valid dict that can be inserted into volume database :rtype: dict """ return_dict = {**validated_volume_input.dict()} if isinstance(validated_volume_input.hard_quota, str): return_dict["hard_quota"] = int(convert_byte_unit(validated_volume_input.hard_quota, target='B')) if isinstance(validated_volume_input.soft_quota, str): return_dict["soft_quota"] = int(convert_byte_unit(validated_volume_input.soft_quota, target='B')) if insert and return_dict.get("join_date") is None: return_dict["join_date"] = datetime.now() col_names = [col.name for col in self.database_table.columns] return_dict = { k: v for k, v in return_dict.items() if k in col_names and return_dict[k] } return return_dict
[docs] def process_usage_fields(self, validated_volume_input: PanasasVolume, usage_details: dict): """ process the fields so that: 1. usage table entry can be identified uniquely (mostly from validated_volume_input) 2. usage table can be populated based on usage_details :param VolumeDataModel volume_input: input that is validated against the pydantic model and can be used to retrieve unique item from the database :param dict usage_details: the usage details of the volume :returns: valid dict that can be inserted into the usage database :rtype: dict """ user = None for user_prefix in ["/nobackup/userspace", "/home"]: if user_prefix in validated_volume_input.name: user = validated_volume_input.name.split(user_prefix)[1] user = remove_slash(user) break # this takes care of both None or empty string # we make it none to keep the database consistent if not user: user = None return_val = { "bladeset": validated_volume_input.bladeset, "name": validated_volume_input.name, "user": user, "extra_info": usage_details.get("extra_info", {}) } for key in ["space_used", "hard_quota", "soft_quota"]: if key in usage_details: return_val[key] = usage_details[key] if isinstance(usage_details.get(key), str): try: return_val[key] = int(convert_byte_unit(return_val[key], target='B')) except ValueError: return_val[key] = 0 return return_val
[docs] @staticmethod def modify_soft_quota(volume_name, new_soft_quota, timeout=600): """ Modify the soft quota of a volume. :param str volume_name: The name of the volume. :param int new_soft_quota: The new soft quota in GB. :param int timeout: The maximum time in seconds to wait for the command to complete. :returns: The output of the command. """ args = [ 'volume', 'set', 'soft-quota', volume_name, str(new_soft_quota), ] soft_output = run_panfs_command(args, timeout=timeout) return soft_output
[docs] @staticmethod def modify_hard_quota(volume_name, new_hard_quota, timeout=600): """ Modify the hard quota of a volume. :param str volume_name: The name of the volume. :param int new_hard_quota: The new hard quota in GB. :param int timeout: The maximum time in seconds to wait for the command to complete. :returns: The output of the command. """ args = [ 'volume', 'set', 'hard-quota', volume_name, str(new_hard_quota) ] hard_output = run_panfs_command(args, timeout=timeout) return hard_output
[docs] def get_volume_fs(self, validated_volume_input, update_usage=False, timeout=600): """ Get the details of a volume from the file system. :param PanasasVolumePartial validated_volume_input: The validated input for the volume. :param bool update_usage: Whether to update the usage table. Default is False. :param int timeout: The maximum time in seconds to wait for the command to complete. :returns: The output of the command. Raises exception if no volume is found in the filesystem. """ args = [ 'volume', 'details', validated_volume_input.name ] vol_info_raw = run_panfs_command(args, timeout=timeout) fs_output = get_key_value_from_raw(vol_info_raw) if update_usage: # by product is that we update the usage table self.add_or_update_usage_table(validated_volume_input, fs_output) return fs_output
[docs] def modify_volume_fs(self, validated_volume_input: PanasasVolumePartial): """ Modify a volume in the file system. :param PanasasVolumePartial validated_volume_input: The validated input for the volume. :returns bool, str, dict: The success of the operation, the message, and the updated volume info. """ soft_quota = convert_byte_unit(validated_volume_input.soft_quota, target='GB', ieee=True) hard_quota = convert_byte_unit(validated_volume_input.hard_quota, target='GB', ieee=True) volume_name = validated_volume_input.name curr_info = self.get_volume_fs(validated_volume_input) old_soft_quota = convert_byte_unit(curr_info["soft_quota"], target='GB', ieee=True) comp_quota = int(hard_quota) if comp_quota <= old_soft_quota: self.modify_soft_quota(volume_name, soft_quota) self.modify_hard_quota(volume_name, hard_quota) else: self.modify_hard_quota(volume_name, hard_quota) self.modify_soft_quota(volume_name, soft_quota) # self testing code updated_vol_info = self.get_volume_fs(validated_volume_input, update_usage=True) updated_soft_quota = convert_byte_unit(updated_vol_info["soft_quota"], target='GB', ieee=True) updated_hard_quota = convert_byte_unit(updated_vol_info["hard_quota"], target='GB', ieee=True) try: assert updated_soft_quota == soft_quota assert updated_hard_quota == hard_quota except AssertionError: return False, "Volume modification failed", None return True, "", updated_vol_info
[docs] def add_volume_fs(self, validated_volume_input: PanasasVolume): """ Add a volume to the file system. :param PanasasVolume validated_volume_input: The validated input for the volume. :returns: The success of the operation, the message, and the updated volume info. :rtype: bool, str, dict """ try: # test if the volume already exists # expect it to fail vol_info = self.get_volume_fs(validated_volume_input) if vol_info is not None: return False, "Volume already exists", vol_info except PanfsCommandError: pass soft_quota_gb = convert_byte_unit(validated_volume_input.soft_quota, target='GB', ieee=True) hard_quota_gb = convert_byte_unit(validated_volume_input.hard_quota, target='GB', ieee=True) # defaults to accre group if the group was not found try: gid = grp.getgrnam(validated_volume_input.group)[2] except Exception: gid = grp.getgrnam("accre")[2] args = [ 'volume', 'create', validated_volume_input.name, 'bladeset', f'"{validated_volume_input.bladeset}"', 'soft', str(int(soft_quota_gb)), 'hard', str(int(hard_quota_gb)), 'uperm', 'all', 'gperm', 'all', 'operm', 'none', 'group', str(gid) ] output = run_panfs_command(args, timeout=600) vol_info = self.get_volume_fs(validated_volume_input, update_usage=True) if vol_info is None: return False, f"Volume creation failed: {output}", None return True, f"Volume created successfully: {output}", vol_info
[docs] def remove_volume_fs(self, volume_input: PanasasVolume): """ Remove a volume from the file system. :param PanasasVolume validated_volume_input: The validated input for the volume. :returns: The success of the operation, the message, and the updated volume info. :rtype: bool, str, dict """ print("Cannot perform this operation. Please do it manually.") print("To do so, open up a python shell and run the following as a root user:") print("\t ssh admin@accrepfs.vampire") print(f"\t volume delete {volume_input['name']}") return False, "Volume deletion failed", None
# PUBLIC FACING FUNCTION
[docs] def get_volumes_fs(self): """ This function will get the volumes from the filesystem If volume_input is None, it will get all the volumes in the filesystem :returns: The list of volumes in the filesystem as a list of dicts. Dict is of format: .. code-block:: python name[str]: The name of the volume bladeset[str]: The bladeset of the volume group[str]: The group that volume belongs to, default "accre" if no group association found soft_quota[bigint]: soft quota in bytes hard_quota[bigint]: hard quota in bytes space_used[bigint]: space used in bytes user_path[str]: Path as seen by the user path: "{DEFAULT_MOUNT_POINT}/ user_path", path as seen by the system user[str] (optional): The user of the volume, if it is a userspace volume :rtype: list[dict] """ arglist = ["volume", "list", "show", "all"] stdout = run_panfs_command(arglist) fs_volumes = parse_volume_list(stdout) return_list = [] for vol in fs_volumes: quotas = {} for quota in ["soft_quota", "hard_quota", "space_used"]: try: quotas[quota] = convert_byte_unit(vol[quota], target='B') except Exception: # some values exist like "---" that don't parse to bytes # set it to -1 to indicate that it is not set quotas[quota] = 0.0 # for volume table group = get_group_from_volume_name(vol["name"], volume_group_mapping=self.mis_matched_volume_groups) return_list.append({ "name": vol["name"], "bladeset": vol["bladeset"], "group": group, "soft_quota": quotas["soft_quota"], "hard_quota": quotas["hard_quota"], "space_used": quotas["space_used"], "path": f"{DEFAULT_MOUNT_POINT}{vol['name']}", "user_path": vol["name"], "extra_info": {} }) return return_list
[docs] def get_users_usage_fs(self): """ Get the usage of a userspace volume for all users from the filesystem. :param str userspace_path: The path of the userspace volume. Example, /nobackup/userspace, /home :returns: The usage of the userspace volume. Example: .. code-block:: python "name": usage["name"], "bladeset": fs_volume["bladeset"], "soft_quota" (float): usage["space_soft_quota"], "hard_quota" (float): usage["space_hard_quota"], "space_used" (int B): usage["space_used"], "path": f"{DEFAULT_MOUNT_POINT}/{fs_volume['name']}", "user_path": fs_volume["name"], "extra_info": { "files_used" (int): usage["files_used"], "files_soft_quota" (float): usage["files_soft_quota"], "files_hard_quota" (float): usage["files_hard_quota"], } :rtype: dict """ return_list = [] fs_vols = self.get_volumes_fs() for userspace_path in ["/nobackup/userspace", "/home"]: fs_volume = [vol for vol in fs_vols if vol["name"] == userspace_path] if len(fs_volume) == 0: logger.info("While trying to get users usage, no volume found for %s", userspace_path) continue elif len(fs_volume) > 1: logger.info("While trying to get users usage, multiple volume found for %s", userspace_path) continue fs_volume = fs_volume[0] # if the path is /nobackup/userspace, set the bladeset arglist = ["userquota", "usage", "-volume", userspace_path] output = run_panfs_command(arglist, timeout=600) all_usage = parse_panfs_userquota_output(output, volume_prefix=userspace_path) for usage in all_usage: return_list.append({ "name": usage["name"], "bladeset": fs_volume["bladeset"], "soft_quota": usage["space_soft_quota"], "hard_quota": usage["space_hard_quota"], "space_used": usage["space_used"], "path": f"{DEFAULT_MOUNT_POINT}/{fs_volume['name']}", "user_path": fs_volume["name"], "extra_info": { "files_used": usage["files_used"], "files_soft_quota": usage["files_soft_quota"], "files_hard_quota": usage["files_hard_quota"], } }) return return_list