Source code for accre.ldap

"""
Tools to interface with the ACCRE internal and Vanderbilt LDAP servers
including a client classes and a cli entry point.
"""
import copy
import crypt
import datetime
import json
import re
import sys

import ldap3

from accre.config import get_config
from accre.exceptions import ACCREValueError, ACCREKeyError
import accre.util
from accre.util import (
    get_posixuser, get_posixgroup, get_shadowuser,
    PosixUser, ShadowUser, PosixGroup,
    accre_argparser,
    RedStr, GreenStr
)


CONFIG = get_config()


# Unset password regexp, used to detect unset passwords in /etc/group
# which are usually 'x' but could potentially be '*' or '!!'
UNSET_PASSWORD_RE = re.compile(r'^(x|!|!!|\*)$')


[docs]class BaseLDAP: """ Base LDAP client for configured services In order to improve performance when checking multiple VUNetID entries, instantiate the client with caching=True to avoid repeatedly fetching the same ID. """ config_section = '' def __init__(self, caching=False, ldap_id=None, auth=None, rootdn=False): cacert = CONFIG[self.config_section].get('cacert', None) if rootdn: try: ldap_id = CONFIG[self.config_section]['rootdn_id'] auth = CONFIG[self.config_section]['rootdn_auth'] except KeyError: raise ACCREKeyError( 'Root dn credentials are not set for your configuration. ' 'Ensure you are authorized and are on the correct host.' ) self.ldap_id = ldap_id self.auth = auth if self.ldap_id is None: self.ldap_id = CONFIG[self.config_section]['id'] self.auth = CONFIG[self.config_section]['auth'] if cacert is None: self.server = ldap3.Server( CONFIG[self.config_section]['server'], use_ssl=True, get_info=ldap3.ALL ) else: tls = ldap3.Tls(ca_certs_file=cacert) self.server = ldap3.Server( CONFIG[self.config_section]['server'], use_ssl=True, get_info=ldap3.ALL, tls=tls ) self.conn = None self.caching = caching self.userids = {} self.groupids = {} self.num_uids = {} self.num_gids = {} def __enter__(self): self.connect() return self def __exit__(self, *args): self.disconnect()
[docs] def connect(self): """ Bind a new connection to the configured LDAP server """ if self.conn and self.conn.bound: self.disconnect() # Needed for unit testing - connection to a mock server requires # a special call if self.server.host == 'unit_test_server': self.conn = ldap3.Connection( self.server, self.ldap_id, self.auth, client_strategy=ldap3.MOCK_SYNC ) self.conn.bind() return self.conn = ldap3.Connection( self.server, self.ldap_id, self.auth ) self.conn.bind()
[docs] def disconnect(self): """ Unbind the current connection to the configured LDAP server """ if self.conn is not None: self.conn.unbind()
def _fetch_user(self, user, search_path): """ Fetch and return a posix user from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the user is not found on the server. """ if self.caching and user in self.userids: if self.userids[user] is not None: return self.userids[user] else: raise ACCREValueError( 'User {0} was not found on the server.'.format(user) ) self.conn.search( search_path, '(uid={0})'.format(user), attributes=['*'] ) if not self.conn.entries: if self.caching: self.userids[user] = None raise ACCREValueError( 'User {0} was not found on the server.'.format(user) ) entry = self.conn.entries[0] if self.caching: self.userids[user] = entry return entry def _fetch_group(self, group, search_path): """ Fetch and return a posix group from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the user is not found on the server. """ if self.caching and group in self.groupids: if self.groupids[group] is not None: return self.groupids[group] else: raise ACCREValueError( 'Group {0} was not found on the server.'.format(group) ) self.conn.search( search_path, '(cn={0})'.format(group), attributes=['*'] ) if not self.conn.entries: if self.caching: self.groupids[group] = None raise ACCREValueError( 'Group {0} was not found on the server.'.format(group) ) entry = self.conn.entries[0] if self.caching: self.groupids[group] = entry return entry def _fetch_num_uid(self, uid, search_path): """ Fetch and return a posix user from the server via numeric UID. If caching is true and it is in the cache, return the cached value. Raise an exception if the user is not found on the server. """ if self.caching and uid in self.num_uids: if self.num_uids[uid] is not None: return self.num_uids[uid] else: raise ACCREValueError( 'UID {0} was not found on the server.'.format(uid) ) self.conn.search( search_path, '(uidNumber={0})'.format(uid), attributes=['*'] ) if not self.conn.entries: if self.caching: self.num_uids[uid] = None raise ACCREValueError( 'UID {0} was not found on the server.'.format(uid) ) entry = self.conn.entries[0] if self.caching: self.num_uids[uid] = entry return entry def _fetch_num_gid(self, gid, search_path): """ Fetch and return a posix group from the server via numeric GID. If caching is true and it is in the cache, return the cached value. Raise an exception if the user is not found on the server. """ if self.caching and gid in self.num_gids: if self.num_gids[gid] is not None: return self.num_gids[gid] else: raise ACCREValueError( 'GID {0} was not found on the server.'.format(gid) ) self.conn.search( search_path, '(gidNumber={0})'.format(gid), attributes=['*'] ) if not self.conn.entries: if self.caching: self.num_gids[gid] = None raise ACCREValueError( 'GID {0} was not found on the server.'.format(gid) ) entry = self.conn.entries[0] if self.caching: self.num_gids[gid] = entry return entry
[docs]class ACCRELDAP(BaseLDAP): """ Client for the ACCRE internal LDAP Service. In order to improve performance when checking multiple VUNetID entries, instantiate the client with caching=True to avoid repeatedly fetching the same ID. """ config_section = 'accreldap'
[docs] def user_record(self, vunetid): """ dump record of an accre user into a dictionary with all LDAP attributes :param str vunetid: ACCRE user username/VUNetID :returns: Dictionary with full LDAP record information """ self.conn.search( 'ou=People,dc=accre,dc=vanderbilt,dc=edu', '(uid={})'.format(vunetid), attributes=['*'] ) return json.loads(self.conn.entries[0].entry_to_json())
[docs] def add_user(self, *, username, passwd, uid, gid=None, homedir=None, shell=None, fullname='', lastpwchange=None, shadowmin=0, shadowmax=99999, shadowwarn=7 ): """ Add a new user account to the directory. The username, password, and uid are required. The gid will default to the uid, the shell to bash, the home directory to /home/<username>, and the last password change to the current date if not set. If an optional attribute is given in as the empty string, then that attribute will not be set in the new LDAP entity. :param str username: the username of the new user (required) :param str passwd: the LDAP-formatted passwd hash for the new user (required) :param int uid: the numeric UID of the new user (required) :param int gid: the numeric group ID of the new user :param str shell: the shell to use for the ner user :param str fullname: the real (full) name of the new user :param int lastpwchange: the number of days since the epoch of the last password change :param int shadowmin: minimum number of days before the user password can be changed :param int shadowmax: number of days since the last change before a user password expires :param int shadowwarn: number of days before expiring that a user will be warned to change their password :returns: True on success, False on failure :rtype: bool """ if gid is None: gid = uid if homedir is None: homedir = '/home/{0}'.format(username) if shell is None: shell = '/bin/bash' if lastpwchange is None: lastpwchange = ( (accre.util.utcnow() - datetime.datetime(1970,1,1)).days ) props = { "cn": username, "uidNumber": uid, "gidNumber": gid, "homeDirectory": homedir, "loginShell": shell, "gecos": fullname, "userPassword": passwd, "shadowLastChange": lastpwchange, "shadowMin": shadowmin, "shadowMax": shadowmax, "shadowWarning": shadowwarn } for key in list(props.keys()): if props[key] == '': del props[key] dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username) return self.conn.add( dn, ["top", "account", "posixAccount", "shadowAccount"], props )
[docs] def add_local_user(self, username): """ Add a user account record from a local user specified in /etc/passwd and /etc/shadow. This method requires that you are root. :param str username: Username to import to LDAP from the local files :returns: True on success, False on failure :rtype: bool """ posix = get_posixuser(username) shadow = get_shadowuser(username) return self.add_user( username=posix.name, passwd='{{CRYPT}}{0}'.format(shadow.password), uid=posix.uid, gid=posix.gid, fullname=posix.gecos, homedir=posix.homedir, shell=posix.shell, lastpwchange=shadow.lastchange, shadowmin=shadow.min, shadowmax=shadow.max, shadowwarn=shadow.warn )
[docs] def delete_user(self, username): """ Remove the specified user account from the directory. :param str username: the username to be removed :returns: True on success, False on failure :rtype: bool """ dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username) return self.conn.delete(dn)
[docs] def change_user_password(self, username, new_password): """ Sets the password of the user to the value ``new_password``. Note that this is the actual un-hashed password. The password is hashed using SHA512 with a salt of length 16. No strength checking is performed by this function. The shadowLastChange is also updated to the current date for the user. :param str username: the username to receive a password change :param str new_password: the unhashed value of the new password :returns: True on success, False on failure :rtype: bool """ dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username) newhash = crypt.crypt(new_password, salt=crypt.METHOD_SHA512) # hashing with METHOD_SHA512 may fail silently on systems that # do not support this in crypt. Check the length as a smoke test. if len(newhash) != 106: raise Exception( 'Error hashing new password, SHA512 unsupported by ' 'crypt on this system.' ) newhash = '{CRYPT}' + newhash lastchange = ( (accre.util.utcnow() - datetime.datetime(1970,1,1)).days ) return self.conn.modify( dn, { 'userPassword': [(ldap3.MODIFY_REPLACE, [newhash])], 'shadowLastChange': [(ldap3.MODIFY_REPLACE, [lastchange])] } )
[docs] def change_user_shell(self, username, shell): """ Sets the login shell of the user to the value ``new_password``. Note that this is the full path to the shell executable file. :param str username: the username to receive a password change :param str shell: the path to the new shell executable :returns: True on success, False on failure :rtype: bool """ dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username) return self.conn.modify( dn, {'loginShell': [(ldap3.MODIFY_REPLACE, [shell])]} )
[docs] def change_user_group(self, username, group): """ Sets the primary group of the user to the numeric GID of the given group name. :param str username: the username to receive a primary group change :param str group: the name of the new primary group :returns: True on success, False on failure :rtype: bool """ dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username) self.conn.search( 'ou=Groups,dc=accre,dc=vanderbilt,dc=edu', '(cn={0})'.format(group), attributes=['cn', 'gidNumber'] ) try: gid = self.conn.entries[0].gidNumber.value except IndexError: return False return self.conn.modify( dn, {'gidNumber': [(ldap3.MODIFY_REPLACE, [gid])]} )
[docs] def list_users(self, active=False): """ Return a list of usernames of all cluster users currently in the directory. :param bool active: Only list active users if set to true, where a user is defined as active if its shell is not /bin/false or /sbin/nologin :returns: All usernames of cluster users in the directory :rtype: list(str) """ nologins = ['/bin/false', '/sbin/nologin'] self.conn.search( 'ou=People,dc=accre,dc=vanderbilt,dc=edu', '(uid=*)', attributes=['uid', 'loginShell'] ) if active: return [ str(self.conn.entries[idx]['uid']) for idx in range(len(self.conn.entries)) if self.conn.entries[idx]['loginShell'] not in nologins ] else: return [ str(self.conn.entries[idx]['uid']) for idx in range(len(self.conn.entries)) ]
[docs] def add_group(self, *, name, gid, passwd='', description='', members=() ): """ Add a new posix group to the directory. The group name, and gid are required. The password, description, and members will default to being unset in the directory. If an optional attribute is given in as the empty string, then that attribute will not be set in the new LDAP entity, except for the members attribute which will not be set if if is an empty iterable. :param str name: the name of the new group (required) :param int gid: the numeric group ID :param str passwd: the group password hash (usually unset) :param list(str) members: member accounts of the group :returns: True on success, False on failure :rtype: bool """ if not members: members = '' props = { "cn": name, "gidNumber": gid, "description": description, "userPassword": passwd, "memberUid": members } for key in list(props.keys()): if props[key] == '': del props[key] dn = "cn={},ou=Groups,dc=accre,dc=vanderbilt,dc=edu".format(name) return self.conn.add( dn, ["top", "posixGroup"], props )
[docs] def add_local_group(self, name): """ Add a user group record from a local group specified in /etc/group. :param str name: Group name to import to LDAP from the local files :returns: True on success, False on failure :rtype: bool """ group = get_posixgroup(name) if group.password and not UNSET_PASSWORD_RE.match(group.password): password = '{{CRYPT}}{0}'.format(group.password) else: password = '' return self.add_group( name=group.name, passwd=password, gid=group.gid, members=group.members )
[docs] def delete_group(self, name): """ Remove the specified posix group from the directory. :param str name: the group name to be removed :returns: True on success, False on failure :rtype: bool """ dn = "cn={},ou=Groups,dc=accre,dc=vanderbilt,dc=edu".format(name) return self.conn.delete(dn)
[docs] def list_groups(self): """ Return a list of group names of all posix groups currently in the directory. :returns: All names of posix groups in the directory :rtype: list(str) """ self.conn.search( 'ou=Groups,dc=accre,dc=vanderbilt,dc=edu', '(cn=*)', attributes=['cn'] ) return [ str(self.conn.entries[idx]['cn']) for idx in range(len(self.conn.entries)) ]
[docs] def list_group_gids(self): """ Return a list of group names and corresponding numeric gids as tuples. :returns: All names and gids of posix groups in the directory :rtype: list(tuple(str, int)) """ self.conn.search( 'ou=Groups,dc=accre,dc=vanderbilt,dc=edu', '(cn=*)', attributes=['cn', 'gidNumber'] ) return [ (entry['cn'].values[0], entry['gidNumber'].values[0]) for entry in self.conn.entries ]
[docs] def group_membership(self): """ Return a dict keyed by group names with a list of members as the value for each group. :returns: Membership of all groups :rtype: dict(str,list(str)) """ self.conn.search( 'ou=Groups,dc=accre,dc=vanderbilt,dc=edu', '(cn=*)', attributes=['cn','memberUid'] ) result = {} for entry in self.conn.entries: result[_get_str(entry, 'cn')] = entry['memberUid'].values return result
[docs] def add_group_member(self, group, user): """ Make the specified group a secondary group for the specified user. :param str group: group to add the user to :param str user: user to add to the group :returns: True on success :rtype: bool """ return self.conn.modify( 'cn={0},ou=Groups,dc=accre,dc=vanderbilt,dc=edu'.format(group), {'memberUid': [(ldap3.MODIFY_ADD, [user])]} )
[docs] def remove_group_member(self, group, user): """ Remove the specified user from the specified group. Note: this method does not work for the primary group of a user. :param str group: group to remove the user from :param str user: user to remove from the group :returns: True on success :rtype: bool """ return self.conn.modify( 'cn={0},ou=Groups,dc=accre,dc=vanderbilt,dc=edu'.format(group), {'memberUid': [(ldap3.MODIFY_DELETE, [user])]} )
[docs] def exists(self, accreid): """ Check if the given ACCRE ID exists on the server :param str accreid: the ACCRE ID to be checked :returns: True if the ACCRE ID exists :rtype: bool """ try: self._fetch_accreid(accreid) return True except ACCREValueError: return False
[docs] def posixuser(self, accreid): """ Return a PosixUser named tuple with the user information equivalent to an /etc/passwd entry for the given ACCRE ID :param str accreid: the ACCRE ID to be checked :returns: Posix user information for the user :rtype: accre.util.PosixUser """ entry = self._fetch_accreid(accreid) return PosixUser( name=_get_str(entry, 'cn'), password='x', uid=int(_get_str(entry, 'uidNumber')), gid=int(_get_str(entry, 'gidNumber')), gecos=_get_str(entry, 'gecos'), homedir=_get_str(entry, 'homeDirectory'), shell=_get_str(entry, 'loginShell') )
[docs] def shadowuser(self, accreid): """ Return a ShadowUser named tuple with the user information equivalent to an /etc/shadow entry for the given ACCRE ID. Note that only the most privileged LDAP users will be able to access the actual password hash. Also note that inactive, expire, and res are unused fields and will be set to empty strings. :param str accreid: the ACCRE ID to be checked :returns: Shadow user information for the user :rtype: accre.util.ShadowUser """ entry = self._fetch_accreid(accreid) password = entry['userPassword'].raw_values[0].decode('ascii') if password.startswith('{CRYPT}'): password = password[7:] return ShadowUser( name=_get_str(entry, 'cn'), password=password, lastchange=_int_or_empty(_get_str(entry, 'shadowLastChange')), min=_int_or_empty(_get_str(entry, 'shadowMin')), max=_int_or_empty(_get_str(entry, 'shadowMax')), warn=_int_or_empty(_get_str(entry, 'shadowWarning')), inactive='', expire='', res='' )
[docs] def posixgroup(self, accregroup): """ Return a PosixGroup named tuple with the group information equivalent to an /etc/group entry for the given ACCRE group :param str accregroup: the ACCRE group to be checked :returns: Posix group information for the user :rtype: accre.util.PosixGroup """ entry = self._fetch_accregroup(accregroup) try: members = tuple(entry['memberUid'].values) except ( ldap3.core.exceptions.LDAPCursorError, ldap3.core.exceptions.LDAPKeyError ): members = () return PosixGroup( name=_get_str(entry, 'cn'), password='x', gid=int(_get_str(entry, 'gidNumber')), members=members )
[docs] def group_exists(self, accregroup): """ Check if the given ACCRE group exists on the server :param str accregroup: the ACCRE group name to be checked :returns: True if the ACCRE group exists :rtype: bool """ try: self._fetch_accregroup(accregroup) return True except ACCREValueError: return False
def _fetch_accreid(self, accreid): """ Fetch and return a accre user from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the accreid is not found on the server. """ return self._fetch_user( accreid, 'ou=People,dc=accre,dc=vanderbilt,dc=edu' ) def _fetch_accregroup(self, accregroup): """ Fetch and return an accre group from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the accre group is not found on the server. """ return self._fetch_group( accregroup, 'ou=Groups,dc=accre,dc=vanderbilt,dc=edu' ) def _fetch_robotid(self, robotid): """ Fetch and return an robot from the server. Raise an exception if the robot is not found on the server. Caching is not used for robots """ self.conn.search( 'ou=Robots,dc=accre,dc=vanderbilt,dc=edu', '(uid={0})'.format(robotid), attributes=['*'] ) if not self.conn.entries: raise ACCREValueError( 'Robot {0} was not found on the server.'.format(robotid) ) return self.conn.entries[0]
[docs] def robot_info(self, robotid): """ Return a dictionary of information about the specified robot vunetid-equivalent including the full name (fullname), uid (uid), Vanderbilt-equivalent email (vanderbilt_email), and titles, where titles indicate the purpose(s) of the robot and if it is a locked account. :param str robotid: the vunetid-equivalent to be checked :returns: fullname, uid, vanderbilt_email, and titles :rtype: dict """ entry = self._fetch_robotid(robotid) result = {} result['fullname'] = str(entry.cn) result['uid'] = int(entry.uidNumber.value) result['vanderbilt_email'] = str(entry.mail) result['titles'] = entry.title.values return result
[docs] def is_robot_locked(self, robotid): """ Return true if the robot is locked/inactive. A robot is considered locked if it has "locked" in its titles. :param str robotid: vunetid-equivalent for the robot :returns: True if locked/inactive :rtype: bool """ entry = self._fetch_robotid(robotid) return 'locked' in entry.title.values
[docs] def set_robot_lock(self, cn, value): """ Set the robot account lock to True or False for the given full or descriptive name of the account. This is equivelent to a VUNetID being locked in the VUIT LDAP. :param str cn: Full or descriptive name for the robot to (un)lock :param bool value: Locked (True) or unlocked (False) :returns: True on success, False on failure :rtype: bool """ dn = ( 'cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu'.format(cn) ) if value: return self.conn.modify( dn, {'title': [(ldap3.MODIFY_ADD, ['locked'])]} ) else: return self.conn.modify( dn, {'title': [(ldap3.MODIFY_DELETE, ['locked'])]} )
[docs] def robot_exists(self, robotid): """ Check if the given vunetid-equivalent exists on the server :param str externalid: the vunetid-equivalent to be checked :returns: True if the vunetid-equivalent exists :rtype: bool """ try: self._fetch_robotid(robotid) return True except ACCREValueError: return False
[docs] def add_robot(self, *, cn, sn, vunetid, uid, mail, title, testrobot=False, legacy_uid=False ): """ Add a new robot to the directory. This record will be used as an equivalent to VandyLDAP for verifying that the robot has a valid "vunetid". A corresponding user record must be created to give a robot access to the cluster. The assigned vunetid must be in the valid robot range (9200-9999) and must not conflict with an existing robot user, unless it is a test user and in the test robot range (9100-9199) :param str cn: The full name or descriptive name of the robot :param str sn: The surname or shorter name of the robot :param str vunetid: The vunetid-equivalent (posix username) for the robot. :param int uid: Numerical posix uid for the robot, must be in the valid robot UID range (9100-9999) :param str mail: Vanderilt-equivalent email address for the robot :param str title: Title to indicate general purpose of the robot, for example "testuser" for a test user :param bool testrobot: If true, this robot is a test robot, must be in the test robot UID range (9100-9199) and may have a UID conflict :param bool legacy_uid: If true, allow for a robot outside of the valid robot UID range. The new robot UID must still not conflict with other robots. Use with caution as this may shadow actual VUNetID UIDs. :returns: True on success, False on failure :rtype: bool """ if not testrobot and not legacy_uid and not 9200 <= uid <= 9999: raise ACCREValueError('Robot UID must be in the range 9200-9999') if testrobot and not 9100 <= uid <= 9199: raise ACCREValueError( 'Test bobot UID must be in the range 9100-9199' ) if not testrobot: self.conn.search( 'ou=Robots,dc=accre,dc=vanderbilt,dc=edu', '(uid=*)', attributes=['uid', 'uidNumber'] ) robots = { e['uidNumber'].value: e['uid'].value for e in self.conn.entries } if uid in robots: raise ACCREValueError( 'UID {0} is already in use by {1}' .format(uid, robots[uid]) ) props = { "cn": cn, "sn": sn, "uid": vunetid, "uidNumber": uid, "gidNumber": 10000, "homeDirectory": '/home/{}'.format(vunetid), "mail": mail, "title": title } dn = ( "cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu" .format(cn) ) return self.conn.add( dn, [ "top", "person", "inetOrgPerson", "posixAccount", "organizationalPerson" ], props )
[docs] def delete_robot(self, cn): """ Remove the specified robot record from the directory. :param str cn: the full/descriptive name of the robot to be removed :returns: True on success, False on failure :rtype: bool """ dn = ( "cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu" .format(cn) ) return self.conn.delete(dn)
[docs] def list_robots(self, active=False): """ Return a list of usernames of all registered robots currently in the directory. :param bool active: Only list active robots if set to true, where a robot is defined as active if it has a title of 'locked' :returns: All usernames of robots in the directory :rtype: list(str) """ self.conn.search( 'ou=Robots,dc=accre,dc=vanderbilt,dc=edu', '(uid=*)', attributes=['uid', 'title'] ) if active: return [ str(self.conn.entries[idx]['uid']) for idx in range(len(self.conn.entries)) if 'locked' not in self.conn.entries[idx]['title'] ] else: return [ str(self.conn.entries[idx]['uid']) for idx in range(len(self.conn.entries)) ]
[docs]class VUDS(BaseLDAP): """ Client for VUDS (Vanderbilt Directory Services) using the ACCRE resource ID with administrative privileges to the ACCRE_Users group in DS. Note that this client has also inherited the functionality of the old "VandyLDAP" client which queried eLDAP. As eLDAP has now been retired, VandyLDAP is now a reference to this class which provides the equivalent API as well as possible but querying VUDS instead. In order to improve performance when checking multiple VUNetID entries, instantiate the client with caching=True to avoid repeatedly fetching the same ID. """ # FOR DS (old shared AD) #config_section = 'vuit-dsadmin' config_section = 'vuit-vudsadmin' users_ou = CONFIG[config_section]['accre_users_ou'] vunetid_re = re.compile( r'CN=([^,]+),OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu' )
[docs] def list_accre_users(self): """ Get a list of current accre user VUNetIDs from the ACCRE_Users group in DS. :returns: VUNetIDs of all users in the ACCRE_Users DS group :rtype: list(str) """ self.conn.search( self.users_ou, '(CN=ACCRE_Users)', attributes=['member'] ) dns = self.conn.entries[0]['member'] return [self.vunetid_re.search(dn).group(1) for dn in dns]
[docs] def add_accre_users(self, new_users): """ Add new user VUNetIDs from a list to the ACCRE_Users group in DS. Note that a single invalid VUNetID will cause the whole operation to fail, so adding one at a time may be preferable. :param list(str) new_users: List of new user VUNetIDs to add :returns: True on operation success, False on failure :rtype: bool """ new_dns = [ f'CN={v},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu' for v in new_users ] return self.conn.modify( f'CN=ACCRE_Users,{self.users_ou}', {'member': [(ldap3.MODIFY_ADD, new_dns)]} )
[docs] def delete_accre_users(self, del_users): """ Remove existing user VUNetIDs from a list to the ACCRE_Users group in DS. :param list(str) del_users: List of user VUNetIDs to delete :returns: True on operation success, False on failure :rtype: bool """ del_dns = [ f'CN={v},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu' for v in del_users ] return self.conn.modify( f'CN=ACCRE_Users,{self.users_ou}', {'member': [(ldap3.MODIFY_DELETE, del_dns)]} )
[docs] def is_active(self, vunetid): """ Check if the given vunetid is an active employee, student, or affiliate of the university .. deprecated:: This logic is no longer valid for VUNet affiliate accounts, do not use :param str vunetid: the vunetid to be checked :returns: True if the vunetid is active :rtype: bool """ try: entry = self._fetch_vunetid(vunetid) except ACCREValueError: return False employee = _get_str(entry, 'vanderbiltPersonActiveEmployee') student = _get_str(entry, 'vanderbiltPersonActiveStudent') affiliate = _get_str(entry, 'eduPersonAffiliation') if employee == 'Y' or student == 'ACTIVE' or affiliate == 'affiliate': return True return False
[docs] def is_locked(self, vunetid): """ Returns False. In the past, this method checked if the given vunetid is locked due to becoming an inactive VUnetID or having an expired VUNet password. .. deprecated:: This logic is no longer valid following the eLDAP to VUDS transition as there is no nsaccountlocked property, do not use. :param str vunetid: the vunetid to be checked :returns: False :rtype: bool """ return False return self.conn.compare( 'uid={0},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu'.format(vunetid), 'nsaccountlock', True )
[docs] def exists(self, vunetid): """ Check if the given vunetid exists on the server :param str vunetid: the vunetid to be checked :returns: True if the vunetid exists :rtype: bool """ try: self._fetch_vunetid(vunetid) return True except ACCREValueError: return False
[docs] def exists_numeric(self, uid): """ Check if the given numeric UID corresponds to a vunetid on the server :param str uid: the numeric UID to be checked :returns: True if the UID exists :rtype: bool """ try: self._fetch_vunetid_numeric(uid) return True except ACCREValueError: return False
[docs] def info(self, vunetid): """ Return a dictionary of information about the specified vunetid including the full name (fullname), uid (uid) and Vanderbilt email (vanderbilt_email). :param str vunetid: the vunetid to be checked :returns: fullname, uid, and vanderbilt_email :rtype: dict """ entry = self._fetch_vunetid(vunetid) result = {} result['fullname'] = str(entry.displayName) result['uid'] = int(entry.uidNumber.value) # Some VandyLDAP accounts are missing the mail attribute try: result['vanderbilt_email'] = str(entry.mail) except ( ldap3.core.exceptions.LDAPCursorError, ldap3.core.exceptions.LDAPKeyError ): result['vanderbilt_email'] = '' return result
[docs] def list_groups_by_gid(self, gid): """ Lists all group names in VUDS corresponding to a given numeric gid. In principle this should return a list of no more than one group, but in the past there have been gid collisions. :param int gid: the numeric gid to be checked :returns: names of all groups with the numeric gid in vuds :rtype: list(str) """ self.conn.search( 'DC=vuds,DC=vanderbilt,DC=edu', f'(gidnumber={gid})', attributes=['cn'] ) # although pathogical, a single entry could have more than one # cn, hence the need to loop over entries to produce a flat list result = [] for entry in self.conn.entries: names = entry.entry_attributes_as_dict.get('cn', []) result.extend(names) return result
[docs] def info_numeric(self, uid): """ Return a dictionary of information about the specified vunetid including the full name (fullname), uid (uid) and Vanderbilt email (vanderbilt_email). :param str uid: the numeric UID to be checked :returns: fullname, uid, and vanderbilt_email :rtype: dict """ entry = self._fetch_vunetid_numeric(uid) record = entry.entry_attributes_as_dict result = {} # FOR DS (old shared AD) # result['uid'] = str(entry.cn.value) result['uid'] = str(record.get("uid", ["(NONE)"])[0]) result['fullname'] = str(record.get("displayName", ["(NONE)"])[0]) result['vanderbilt_email'] = str(record.get("mail", ["(NONE)"])[0]) result['org'] = str(record.get("o", ["(NONE)"])[0]) result['givenName'] = str(record.get("givenName", ["(NONE)"])[0]) result['middleName'] = str(record.get("middleName", ["(NONE)"])[0]) result['sn'] = str(record.get("sn", ["(NONE)"])[0]) return result
def _fetch_vunetid(self, vunetid): """ Fetch and return a vunetid from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the vunetid is not found on the server. """ return self._fetch_user(vunetid, 'OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu') def _fetch_vunetid_numeric(self, uid): """ Fetch and return a vunetid from the server. If caching is true and it is in the cache, return the cached value. Raise an exception if the vunetid is not found on the server. """ # FOR DS (old shared AD) # return self._fetch_num_uid(uid, 'CN=Users,DC=ds,DC=vanderbilt,DC=edu') return self._fetch_num_uid(uid, 'OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu')
# For script compatibility, the old VandyLDAP class is now a reference # to VUDS which has a superset of the old VandyLDAP API. VandyLDAP # was removed when the old eLDAP service was retired by VUIT VandyLDAP = VUDS def _get_str(entry, attr): """ Safely retrieve an attribute from an ldap3.abstract.entry.Entry as a string, or return the empty string if it doesn't exist You can't just do a getattr with a default due to the way ldap3 Entry objects implement the dunder methods. """ try: return str(getattr(entry, attr)) except ( ldap3.core.exceptions.LDAPCursorError, ldap3.core.exceptions.LDAPKeyError ): return '' def _int_or_empty(value): """ Return an integer from the given value, or the empty string if value is an empty string. Raise an exception otherwise """ try: return int(value) except ValueError: if value == '': return value raise ACCREValueError( 'Value {0} is not an empty string or a valid int.'.format(value) )
[docs]def vandy_cli(): """ CLI entry point for Vanderbilt VUDS LDAP access Run ``vandyldap --help`` for usage """ description = 'Commands for accessing Vanderbilt LDAP' parser = accre_argparser('vandyldap', description=description) subparsers = parser.add_subparsers() parser_check = subparsers.add_parser( 'check', help="Check the status and get information about a vunetid" ) parser_check.add_argument('vunetid', type=str) parser_check.set_defaults(func=_vandy_cli_check) parser_checkuid = subparsers.add_parser( 'checkuid', help="Check the status and get information about a vunetid by looking up the numeric UID" ) parser_checkuid.add_argument('num_uid', type=str) parser_checkuid.set_defaults(func=_vandy_cli_checkuid) parser_batchuid = subparsers.add_parser( 'batchuid', help="Take file with one UID number per line and generate a TSV file with info about each UID" ) parser_batchuid.add_argument('uid_list_file', type=str) parser_batchuid.set_defaults(func=_vandy_cli_batchuid) args = parser.parse_args() if hasattr(args, 'func') and args.func: args.func(args) else: parser.print_help()
def _vandy_cli_check(args): """ Print out information about the specified VUNetID. If the VUNetID does not exist on the LDAP server, print an error message and return exit code 1. """ with VUDS(caching=True) as client: if not client.exists(args.vunetid): print('The VUNetID {0} does not exist'.format(args.vunetid)) sys.exit(1) if client.is_active(args.vunetid): active = 'Yes' else: active = 'No' info = client.info(args.vunetid) print('Full Name: {0}'.format(info['fullname'])) print('Active: {0}'.format(active)) print('UID: {0}'.format(info['uid'])) print('Vanderbilt Email: {0}'.format(info['vanderbilt_email'])) sys.exit(0) def _vandy_cli_checkuid(args): """ Print out information about the VUNetID corresponding to the provided numeric UID. If the VUNetID does not exist on the LDAP server, print an error message and return exit code 1. """ with VUDS(caching=True) as client: if not client.exists_numeric(args.num_uid): print('The (numeric) UID {0} does not exist'.format(args.num_uid)) sys.exit(1) info = client.info_numeric(args.num_uid) print('Full Name: {0}'.format(info['fullname'])) print('UID: {0}'.format(info['uid'])) print('Vanderbilt Email: {0}'.format(info['vanderbilt_email'])) sys.exit(0) def _vandy_cli_batchuid(args): """ Take file with one UID number per line and generate a TSV file with info about each UID """ print('\t'.join(['uidNumber','uid','o(org)','givenName','middleName','sn','mail','displayName'])) with VUDS(caching=True) as client: with open(args.uid_list_file) as uidfile: for line in uidfile: num_uid = int(line) if not client.exists_numeric(num_uid): print('{0}\t{1}'.format(num_uid,'NOT_FOUND')) continue info = client.info_numeric(num_uid) print('\t'.join([str(num_uid),info['uid'],info['org'],info['givenName'],info['middleName'],info['sn'],info['vanderbilt_email'],info['fullname']])) sys.exit(0)
[docs]def accre_cli(): """ CLI entry point for Vanderbilt LDAP access Run ``accreldap --help`` for usage """ description = 'Commands for accessing ACCRE LDAP' parser = accre_argparser('accreldap', description=description) subparsers = parser.add_subparsers() parser_user = subparsers.add_parser( 'user', help="Check the status of or get information about an ACCRE user" ) parser_user.add_argument('username', type=str) parser_user.add_argument('--change-group', action='store', default=None, help="New primary group to set the user to" ) parser_user.add_argument('--add-group', action='store', default=None, help="Secondary group (or comma delimited list of groups) to add the user to" ) parser_user.add_argument('--remove-group', action='store', default=None, help="Secondary group (or comma delimited list of groups) to remove the user from" ) parser_user.set_defaults(func=_accre_cli_user) args = parser.parse_args() if hasattr(args, 'func') and args.func: args.func(args) else: parser.print_help()
def _accre_cli_user(args): """ Print out information about the specified username, and optionally make changes to the user. """ user = args.username rootdn = False if any([args.add_group, args.remove_group, args.change_group]): rootdn=True with ACCRELDAP(rootdn=rootdn) as client: if not client.exists(user): print(RedStr('Error: User {0} does not exist'.format(user))) sys.exit(1) if args.change_group: if not client.group_exists(args.change_group): print(RedStr('Error: invalid primary group')) sys.exit(1) result = client.change_user_group(user, args.change_group) if not result: print(RedStr('Error: failed to change group')) sys.exit(1) print(GreenStr('Successfully changed primary group')) if args.add_group: groups = args.add_group.split(',') if not all(client.group_exists(g) for g in groups): print(RedStr('Error: invalid secondary group')) sys.exit(1) for group in groups: client.add_group_member(group, user) print(GreenStr('Added {0} to group {1}'.format( user, group))) if args.remove_group: groups = args.remove_group.split(',') if not all(client.group_exists(g) for g in groups): print(RedStr('Error: invalid secondary group')) sys.exit(1) for group in groups: client.remove_group_member(group, user) print(GreenStr('Removed {0} from group {1}'.format( user, group))) record = client.user_record(user) _print_user_record(record) sys.exit(0) def _print_user_record(record): precord = copy.deepcopy(record) del precord['attributes']['objectClass'] if 'userPassword' in precord['attributes']: del precord['attributes']['userPassword'] print('dn: {0}'.format(precord['dn'])) for attr in precord['attributes']: print(' {0}: {1}'.format( attr, ', '.join(str(x) for x in precord['attributes'][attr]) )) # File format hints for vim, if the modeline option is enabled (:set modeline=on) then 'modelines' # number of lines (def:5) are searched at the start and end of file and settings found are adopted. # Format: convert tabs to 4 spaces # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab