"""
Main module for the panfs functionality
"""
import subprocess
import grp
from datetime import datetime
from accre.database.model import PANFS_VOLUMES, PANFS_USAGE
from accre.util import convert_byte_unit
from accre.logger import get_logger
from accre.filesystem.symlinks import (
create_or_update_symlink,
delete_symlink as del_symlink
)
from accre.filesystem.basefilesystem import GenericFileSystemMixin
from .constants import (
COULDNT_CREATE_SYMLINK_MSG,
DEFAULT_MOUNT_POINT,
PanfsCommandError,
PanasasVolume,
PanasasVolumePartial,
)
from .utils import (
remove_slash,
get_key_value_from_raw,
run_panfs_command,
parse_volume_list,
get_group_from_volume_name,
parse_panfs_userquota_output
)
logger = get_logger()
[docs]class PanasasFileSystem(GenericFileSystemMixin):
"""
FileSystem client for PanFS
"""
database_table = PANFS_VOLUMES
usage_table = PANFS_USAGE
data_model = PanasasVolume
data_model_partial = PanasasVolumePartial
# Make use of this if the volume doesn't match correctly with the group name
mis_matched_volume_groups = {}
[docs] def process_fields_db(self, validated_volume_input, insert=False) -> dict:
"""
Process the fields so that they can be inserted into the database
for example: if hard_quota is 7 TB, then convert it to bytes and return that value
:param PanasasVolume validated_volume_input: The validated input for the volume.
:param bool insert: If True, the input is for an insert operation. If False, the input is for an update operation.
:returns: valid dict that can be inserted into volume database
:rtype: dict
"""
return_dict = {**validated_volume_input.dict()}
if isinstance(validated_volume_input.hard_quota, str):
return_dict["hard_quota"] = int(convert_byte_unit(validated_volume_input.hard_quota, target='B'))
if isinstance(validated_volume_input.soft_quota, str):
return_dict["soft_quota"] = int(convert_byte_unit(validated_volume_input.soft_quota, target='B'))
if insert and return_dict.get("join_date") is None:
return_dict["join_date"] = datetime.now()
col_names = [col.name for col in self.database_table.columns]
return_dict = {
k: v for k, v in return_dict.items()
if k in col_names and return_dict[k]
}
return return_dict
[docs] def process_usage_fields(self, validated_volume_input: PanasasVolume, usage_details: dict):
"""
process the fields so that:
1. usage table entry can be identified uniquely (mostly from validated_volume_input)
2. usage table can be populated based on usage_details
:param VolumeDataModel volume_input: input that is validated against the pydantic model and can be used to retrieve unique item from the database
:param dict usage_details: the usage details of the volume
:returns: valid dict that can be inserted into the usage database
:rtype: dict
"""
user = None
for user_prefix in ["/nobackup/userspace", "/home"]:
if user_prefix in validated_volume_input.name:
user = validated_volume_input.name.split(user_prefix)[1]
user = remove_slash(user)
break
# this takes care of both None or empty string
# we make it none to keep the database consistent
if not user:
user = None
return_val = {
"bladeset": validated_volume_input.bladeset,
"name": validated_volume_input.name,
"user": user,
"extra_info": usage_details.get("extra_info", {})
}
for key in ["space_used", "hard_quota", "soft_quota"]:
if key in usage_details:
return_val[key] = usage_details[key]
if isinstance(usage_details.get(key), str):
try:
return_val[key] = int(convert_byte_unit(return_val[key], target='B'))
except ValueError:
return_val[key] = 0
return return_val
[docs] @staticmethod
def modify_soft_quota(volume_name, new_soft_quota, timeout=600):
"""
Modify the soft quota of a volume.
:param str volume_name: The name of the volume.
:param int new_soft_quota: The new soft quota in GB.
:param int timeout: The maximum time in seconds to wait for the command to complete.
:returns: The output of the command.
"""
args = [
'volume',
'set',
'soft-quota',
volume_name,
str(new_soft_quota),
]
soft_output = run_panfs_command(args, timeout=timeout)
return soft_output
[docs] @staticmethod
def modify_hard_quota(volume_name, new_hard_quota, timeout=600):
"""
Modify the hard quota of a volume.
:param str volume_name: The name of the volume.
:param int new_hard_quota: The new hard quota in GB.
:param int timeout: The maximum time in seconds to wait for the command to complete.
:returns: The output of the command.
"""
args = [
'volume',
'set',
'hard-quota',
volume_name,
str(new_hard_quota)
]
hard_output = run_panfs_command(args, timeout=timeout)
return hard_output
[docs] def get_volume_fs(self, validated_volume_input, update_usage=False, timeout=600):
"""
Get the details of a volume from the file system.
:param PanasasVolumePartial validated_volume_input: The validated input for the volume.
:param bool update_usage: Whether to update the usage table. Default is False.
:param int timeout: The maximum time in seconds to wait for the command to complete.
:returns: The output of the command. Raises exception if no volume is found in the filesystem.
"""
args = [
'volume',
'details',
validated_volume_input.name
]
vol_info_raw = run_panfs_command(args, timeout=timeout)
fs_output = get_key_value_from_raw(vol_info_raw)
if update_usage:
# by product is that we update the usage table
self.add_or_update_usage_table(validated_volume_input, fs_output)
return fs_output
[docs] def modify_volume_fs(self, validated_volume_input: PanasasVolumePartial):
"""
Modify a volume in the file system.
:param PanasasVolumePartial validated_volume_input: The validated input for the volume.
:returns bool, str, dict: The success of the operation, the message, and the updated volume info.
"""
soft_quota = convert_byte_unit(validated_volume_input.soft_quota, target='GB', ieee=True)
hard_quota = convert_byte_unit(validated_volume_input.hard_quota, target='GB', ieee=True)
volume_name = validated_volume_input.name
curr_info = self.get_volume_fs(validated_volume_input)
old_soft_quota = convert_byte_unit(curr_info["soft_quota"], target='GB', ieee=True)
comp_quota = int(hard_quota)
if comp_quota <= old_soft_quota:
self.modify_soft_quota(volume_name, soft_quota)
self.modify_hard_quota(volume_name, hard_quota)
else:
self.modify_hard_quota(volume_name, hard_quota)
self.modify_soft_quota(volume_name, soft_quota)
# self testing code
updated_vol_info = self.get_volume_fs(validated_volume_input, update_usage=True)
updated_soft_quota = convert_byte_unit(updated_vol_info["soft_quota"], target='GB', ieee=True)
updated_hard_quota = convert_byte_unit(updated_vol_info["hard_quota"], target='GB', ieee=True)
try:
assert updated_soft_quota == soft_quota
assert updated_hard_quota == hard_quota
except AssertionError:
return False, "Volume modification failed", None
return True, "", updated_vol_info
[docs] def add_volume_fs(self, validated_volume_input: PanasasVolume):
"""
Add a volume to the file system.
:param PanasasVolume validated_volume_input: The validated input for the volume.
:returns: The success of the operation, the message, and the updated volume info.
:rtype: bool, str, dict
"""
try:
# test if the volume already exists
# expect it to fail
vol_info = self.get_volume_fs(validated_volume_input)
if vol_info is not None:
return False, "Volume already exists", vol_info
except PanfsCommandError:
pass
soft_quota_gb = convert_byte_unit(validated_volume_input.soft_quota, target='GB', ieee=True)
hard_quota_gb = convert_byte_unit(validated_volume_input.hard_quota, target='GB', ieee=True)
# defaults to accre group if the group was not found
try:
gid = grp.getgrnam(validated_volume_input.group)[2]
except Exception:
gid = grp.getgrnam("accre")[2]
args = [
'volume',
'create',
validated_volume_input.name,
'bladeset',
f'"{validated_volume_input.bladeset}"',
'soft',
str(int(soft_quota_gb)),
'hard',
str(int(hard_quota_gb)),
'uperm',
'all',
'gperm',
'all',
'operm',
'none',
'group',
str(gid)
]
output = run_panfs_command(args, timeout=600)
vol_info = self.get_volume_fs(validated_volume_input, update_usage=True)
if vol_info is None:
return False, f"Volume creation failed: {output}", None
return True, f"Volume created successfully: {output}", vol_info
[docs] def remove_volume_fs(self, volume_input: PanasasVolume):
"""
Remove a volume from the file system.
:param PanasasVolume validated_volume_input: The validated input for the volume.
:returns: The success of the operation, the message, and the updated volume info.
:rtype: bool, str, dict
"""
print("Cannot perform this operation. Please do it manually.")
print("To do so, open up a python shell and run the following as a root user:")
print("\t ssh admin@accrepfs.vampire")
print(f"\t volume delete {volume_input['name']}")
return False, "Volume deletion failed", None
[docs] def delete_symlink(self, validated_volume_input: PanasasVolume):
"""
Delete symlinks for the volume.
Prints the output of the symlinks command as a side effect
:param PanasasVolume validated_volume_input: The validated input for the volume.
:returns: None
"""
fileset_type = None
symlink_name = None
if "/nobackup" in validated_volume_input.name:
fileset_type = "nobackup"
elif "/dors" in validated_volume_input.name:
fileset_type = "dors"
elif "/data" in validated_volume_input.name:
fileset_type = "data"
else:
error = f"Could not determine fileset type for volume {validated_volume_input.name}."
logger.error(COULDNT_CREATE_SYMLINK_MSG.format(error=error))
return False
try:
# programmatically add symlinks.
symlink_name = validated_volume_input.name.split(f"/{fileset_type}")[1]
symlink_name = remove_slash(symlink_name)
except IndexError:
error = f"Could not determine symlink name for volume {validated_volume_input.name}."
logger.error(COULDNT_CREATE_SYMLINK_MSG.format(error=error))
return False
del_symlink(
symlink_name,
fileset_type
)
[docs] def create_symlink(self, validated_volume_input: PanasasVolume):
"""
Create symlinks for the volume.
:param PanasasVolume validated_volume_input: The validated input for the volume.
:returns: Whether the symlink was created successfully
:rtype: bool
"""
fileset_type = None
symlink_name = None
if "/nobackup" in validated_volume_input.name:
fileset_type = "nobackup"
elif "/dors" in validated_volume_input.name:
fileset_type = "dors"
elif "/data" in validated_volume_input.name:
fileset_type = "data"
else:
error = f"Could not determine fileset type for volume {validated_volume_input.name}."
logger.error(COULDNT_CREATE_SYMLINK_MSG.format(error=error))
return False
try:
# programmatically add symlinks.
symlink_name = validated_volume_input.name.split(f"/{fileset_type}")[1]
symlink_name = remove_slash(symlink_name)
except IndexError:
error = f"Could not determine symlink name for volume {validated_volume_input.name}."
logger.error(COULDNT_CREATE_SYMLINK_MSG.format(error=error))
return False
create_or_update_symlink(
symlink_name,
validated_volume_input.path,
fileset_type
)
return True
[docs] def deploy_symlinks(self):
"""
Deploy symlinks from the database.
"""
logger.info("Deploying the symlinks")
arglist = [
"/accre/cm/ansible/utilities/symlinks.sh",
"-o /accre/var/computed_filesets.yml",
"-d"
]
proc = subprocess.Popen(
arglist,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL
)
stdout, stderr = proc.communicate(timeout=600)
if proc.returncode != 0:
error = f'Symlink deployment failed with exit code {proc.returncode}: {stderr}.'
logger.error(COULDNT_CREATE_SYMLINK_MSG.format(error=error))
return
logger.info(stdout.decode('utf-8'))
return None
# PUBLIC FACING FUNCTION
[docs] def get_volumes_fs(self):
"""
This function will get the volumes from the filesystem
If volume_input is None, it will get all the volumes in the filesystem
:returns: The list of volumes in the filesystem as a list of dicts. Dict is of format:
.. code-block:: python
name[str]: The name of the volume
bladeset[str]: The bladeset of the volume
group[str]: The group that volume belongs to, default "accre" if no group association found
soft_quota[bigint]: soft quota in bytes
hard_quota[bigint]: hard quota in bytes
space_used[bigint]: space used in bytes
user_path[str]: Path as seen by the user
path: "{DEFAULT_MOUNT_POINT}/ user_path", path as seen by the system
user[str] (optional): The user of the volume, if it is a userspace volume
:rtype: list[dict]
"""
arglist = ["volume", "list", "show", "all"]
stdout = run_panfs_command(arglist)
fs_volumes = parse_volume_list(stdout)
return_list = []
for vol in fs_volumes:
quotas = {}
for quota in ["soft_quota", "hard_quota", "space_used"]:
try:
quotas[quota] = convert_byte_unit(vol[quota], target='B')
except Exception:
# some values exist like "---" that don't parse to bytes
# set it to -1 to indicate that it is not set
quotas[quota] = 0.0
# for volume table
group = get_group_from_volume_name(vol["name"], volume_group_mapping=self.mis_matched_volume_groups)
return_list.append({
"name": vol["name"],
"bladeset": vol["bladeset"],
"group": group,
"soft_quota": quotas["soft_quota"],
"hard_quota": quotas["hard_quota"],
"space_used": quotas["space_used"],
"path": f"{DEFAULT_MOUNT_POINT}{vol['name']}",
"user_path": vol["name"],
"extra_info": {}
})
return return_list
[docs] def get_users_usage_fs(self):
"""
Get the usage of a userspace volume for all users from the filesystem.
:param str userspace_path: The path of the userspace volume. Example, /nobackup/userspace, /home
:returns: The usage of the userspace volume. Example:
.. code-block:: python
"name": usage["name"],
"bladeset": fs_volume["bladeset"],
"soft_quota" (float): usage["space_soft_quota"],
"hard_quota" (float): usage["space_hard_quota"],
"space_used" (int B): usage["space_used"],
"path": f"{DEFAULT_MOUNT_POINT}/{fs_volume['name']}",
"user_path": fs_volume["name"],
"extra_info": {
"files_used" (int): usage["files_used"],
"files_soft_quota" (float): usage["files_soft_quota"],
"files_hard_quota" (float): usage["files_hard_quota"],
}
:rtype: dict
"""
return_list = []
fs_vols = self.get_volumes_fs()
for userspace_path in ["/nobackup/userspace", "/home"]:
fs_volume = [vol for vol in fs_vols if vol["name"] == userspace_path]
if len(fs_volume) == 0:
logger.info("While trying to get users usage, no volume found for %s", userspace_path)
continue
elif len(fs_volume) > 1:
logger.info("While trying to get users usage, multiple volume found for %s", userspace_path)
continue
fs_volume = fs_volume[0]
# if the path is /nobackup/userspace, set the bladeset
arglist = ["userquota", "usage", "-volume", userspace_path]
output = run_panfs_command(arglist, timeout=600)
all_usage = parse_panfs_userquota_output(output, volume_prefix=userspace_path)
for usage in all_usage:
return_list.append({
"name": usage["name"],
"bladeset": fs_volume["bladeset"],
"soft_quota": usage["space_soft_quota"],
"hard_quota": usage["space_hard_quota"],
"space_used": usage["space_used"],
"path": f"{DEFAULT_MOUNT_POINT}/{fs_volume['name']}",
"user_path": fs_volume["name"],
"extra_info": {
"files_used": usage["files_used"],
"files_soft_quota": usage["files_soft_quota"],
"files_hard_quota": usage["files_hard_quota"],
}
})
return return_list