"""
Tools to interface with the ACCRE internal and Vanderbilt LDAP servers
including a client classes and a cli entry point.
"""
import copy
import crypt
import datetime
import json
import re
import sys
import ldap3
from accre.config import get_config
from accre.exceptions import ACCREValueError, ACCREKeyError
import accre.util
from accre.util import (
get_posixuser, get_posixgroup, get_shadowuser,
PosixUser, ShadowUser, PosixGroup,
accre_argparser,
RedStr, GreenStr
)
CONFIG = get_config()
# Unset password regexp, used to detect unset passwords in /etc/group
# which are usually 'x' but could potentially be '*' or '!!'
UNSET_PASSWORD_RE = re.compile(r'^(x|!|!!|\*)$')
[docs]class BaseLDAP:
"""
Base LDAP client for configured services
In order to improve performance when checking multiple VUNetID entries,
instantiate the client with caching=True to avoid repeatedly fetching
the same ID.
"""
config_section = ''
def __init__(self, caching=False, ldap_id=None, auth=None, rootdn=False):
cacert = CONFIG[self.config_section].get('cacert', None)
if rootdn:
try:
ldap_id = CONFIG[self.config_section]['rootdn_id']
auth = CONFIG[self.config_section]['rootdn_auth']
except KeyError:
raise ACCREKeyError(
'Root dn credentials are not set for your configuration. '
'Ensure you are authorized and are on the correct host.'
)
self.ldap_id = ldap_id
self.auth = auth
if self.ldap_id is None:
self.ldap_id = CONFIG[self.config_section]['id']
self.auth = CONFIG[self.config_section]['auth']
if cacert is None:
self.server = ldap3.Server(
CONFIG[self.config_section]['server'],
use_ssl=True,
get_info=ldap3.ALL
)
else:
tls = ldap3.Tls(ca_certs_file=cacert)
self.server = ldap3.Server(
CONFIG[self.config_section]['server'],
use_ssl=True,
get_info=ldap3.ALL,
tls=tls
)
self.conn = None
self.caching = caching
self.userids = {}
self.groupids = {}
self.num_uids = {}
self.num_gids = {}
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()
[docs] def connect(self):
"""
Bind a new connection to the configured LDAP server
"""
if self.conn and self.conn.bound:
self.disconnect()
# Needed for unit testing - connection to a mock server requires
# a special call
if self.server.host == 'unit_test_server':
self.conn = ldap3.Connection(
self.server,
self.ldap_id,
self.auth,
client_strategy=ldap3.MOCK_SYNC
)
self.conn.bind()
return
self.conn = ldap3.Connection(
self.server,
self.ldap_id,
self.auth
)
self.conn.bind()
[docs] def disconnect(self):
"""
Unbind the current connection to the configured LDAP server
"""
if self.conn is not None:
self.conn.unbind()
def _fetch_user(self, user, search_path):
"""
Fetch and return a posix user from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the user is not found on the server.
"""
if self.caching and user in self.userids:
if self.userids[user] is not None:
return self.userids[user]
else:
raise ACCREValueError(
'User {0} was not found on the server.'.format(user)
)
self.conn.search(
search_path,
'(uid={0})'.format(user),
attributes=['*']
)
if not self.conn.entries:
if self.caching:
self.userids[user] = None
raise ACCREValueError(
'User {0} was not found on the server.'.format(user)
)
entry = self.conn.entries[0]
if self.caching:
self.userids[user] = entry
return entry
def _fetch_group(self, group, search_path):
"""
Fetch and return a posix group from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the user is not found on the server.
"""
if self.caching and group in self.groupids:
if self.groupids[group] is not None:
return self.groupids[group]
else:
raise ACCREValueError(
'Group {0} was not found on the server.'.format(group)
)
self.conn.search(
search_path,
'(cn={0})'.format(group),
attributes=['*']
)
if not self.conn.entries:
if self.caching:
self.groupids[group] = None
raise ACCREValueError(
'Group {0} was not found on the server.'.format(group)
)
entry = self.conn.entries[0]
if self.caching:
self.groupids[group] = entry
return entry
def _fetch_num_uid(self, uid, search_path):
"""
Fetch and return a posix user from the server via numeric UID. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the user is not found on the server.
"""
if self.caching and uid in self.num_uids:
if self.num_uids[uid] is not None:
return self.num_uids[uid]
else:
raise ACCREValueError(
'UID {0} was not found on the server.'.format(uid)
)
self.conn.search(
search_path,
'(uidNumber={0})'.format(uid),
attributes=['*']
)
if not self.conn.entries:
if self.caching:
self.num_uids[uid] = None
raise ACCREValueError(
'UID {0} was not found on the server.'.format(uid)
)
entry = self.conn.entries[0]
if self.caching:
self.num_uids[uid] = entry
return entry
def _fetch_num_gid(self, gid, search_path):
"""
Fetch and return a posix group from the server via numeric GID. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the user is not found on the server.
"""
if self.caching and gid in self.num_gids:
if self.num_gids[gid] is not None:
return self.num_gids[gid]
else:
raise ACCREValueError(
'GID {0} was not found on the server.'.format(gid)
)
self.conn.search(
search_path,
'(gidNumber={0})'.format(gid),
attributes=['*']
)
if not self.conn.entries:
if self.caching:
self.num_gids[gid] = None
raise ACCREValueError(
'GID {0} was not found on the server.'.format(gid)
)
entry = self.conn.entries[0]
if self.caching:
self.num_gids[gid] = entry
return entry
[docs]class ACCRELDAP(BaseLDAP):
"""
Client for the ACCRE internal LDAP Service.
In order to improve performance when checking multiple VUNetID entries,
instantiate the client with caching=True to avoid repeatedly fetching
the same ID.
"""
config_section = 'accreldap'
[docs] def user_record(self, vunetid):
"""
dump record of an accre user into a dictionary with all LDAP attributes
:param str vunetid: ACCRE user username/VUNetID
:returns: Dictionary with full LDAP record information
"""
self.conn.search(
'ou=People,dc=accre,dc=vanderbilt,dc=edu',
'(uid={})'.format(vunetid),
attributes=['*']
)
return json.loads(self.conn.entries[0].entry_to_json())
[docs] def add_user(self, *, username, passwd, uid,
gid=None,
homedir=None,
shell=None,
fullname='',
lastpwchange=None,
shadowmin=0,
shadowmax=99999,
shadowwarn=7
):
"""
Add a new user account to the directory. The username, password, and
uid are required. The gid will default to the uid, the shell to
bash, the home directory to /home/<username>, and the last password
change to the current date if not set.
If an optional attribute is given in as the empty string, then
that attribute will not be set in the new LDAP entity.
:param str username: the username of the new user (required)
:param str passwd: the LDAP-formatted passwd hash for the new user
(required)
:param int uid: the numeric UID of the new user (required)
:param int gid: the numeric group ID of the new user
:param str shell: the shell to use for the ner user
:param str fullname: the real (full) name of the new user
:param int lastpwchange: the number of days since the epoch of the
last password change
:param int shadowmin: minimum number of days before the user password
can be changed
:param int shadowmax: number of days since the last change before
a user password expires
:param int shadowwarn: number of days before expiring that a user
will be warned to change their password
:returns: True on success, False on failure
:rtype: bool
"""
if gid is None:
gid = uid
if homedir is None:
homedir = '/home/{0}'.format(username)
if shell is None:
shell = '/bin/bash'
if lastpwchange is None:
lastpwchange = (
(accre.util.utcnow() - datetime.datetime(1970,1,1)).days
)
props = {
"cn": username,
"uidNumber": uid,
"gidNumber": gid,
"homeDirectory": homedir,
"loginShell": shell,
"gecos": fullname,
"userPassword": passwd,
"shadowLastChange": lastpwchange,
"shadowMin": shadowmin,
"shadowMax": shadowmax,
"shadowWarning": shadowwarn
}
for key in list(props.keys()):
if props[key] == '':
del props[key]
dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username)
return self.conn.add(
dn,
["top", "account", "posixAccount", "shadowAccount"],
props
)
[docs] def add_local_user(self, username):
"""
Add a user account record from a local user specified in
/etc/passwd and /etc/shadow. This method requires that you are
root.
:param str username: Username to import to LDAP from the local files
:returns: True on success, False on failure
:rtype: bool
"""
posix = get_posixuser(username)
shadow = get_shadowuser(username)
return self.add_user(
username=posix.name,
passwd='{{CRYPT}}{0}'.format(shadow.password),
uid=posix.uid,
gid=posix.gid,
fullname=posix.gecos,
homedir=posix.homedir,
shell=posix.shell,
lastpwchange=shadow.lastchange,
shadowmin=shadow.min,
shadowmax=shadow.max,
shadowwarn=shadow.warn
)
[docs] def delete_user(self, username):
"""
Remove the specified user account from the directory.
:param str username: the username to be removed
:returns: True on success, False on failure
:rtype: bool
"""
dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username)
return self.conn.delete(dn)
[docs] def change_user_password(self, username, new_password):
"""
Sets the password of the user to the value ``new_password``. Note
that this is the actual un-hashed password. The password is hashed
using SHA512 with a salt of length 16. No strength checking is
performed by this function. The shadowLastChange is also updated
to the current date for the user.
:param str username: the username to receive a password change
:param str new_password: the unhashed value of the new password
:returns: True on success, False on failure
:rtype: bool
"""
dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username)
newhash = crypt.crypt(new_password, salt=crypt.METHOD_SHA512)
# hashing with METHOD_SHA512 may fail silently on systems that
# do not support this in crypt. Check the length as a smoke test.
if len(newhash) != 106:
raise Exception(
'Error hashing new password, SHA512 unsupported by '
'crypt on this system.'
)
newhash = '{CRYPT}' + newhash
lastchange = (
(accre.util.utcnow() - datetime.datetime(1970,1,1)).days
)
return self.conn.modify(
dn,
{
'userPassword': [(ldap3.MODIFY_REPLACE, [newhash])],
'shadowLastChange': [(ldap3.MODIFY_REPLACE, [lastchange])]
}
)
[docs] def change_user_shell(self, username, shell):
"""
Sets the login shell of the user to the value ``new_password``. Note
that this is the full path to the shell executable file.
:param str username: the username to receive a password change
:param str shell: the path to the new shell executable
:returns: True on success, False on failure
:rtype: bool
"""
dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username)
return self.conn.modify(
dn,
{'loginShell': [(ldap3.MODIFY_REPLACE, [shell])]}
)
[docs] def change_user_group(self, username, group):
"""
Sets the primary group of the user to the numeric GID of the
given group name.
:param str username: the username to receive a primary group change
:param str group: the name of the new primary group
:returns: True on success, False on failure
:rtype: bool
"""
dn = "uid={},ou=People,dc=accre,dc=vanderbilt,dc=edu".format(username)
self.conn.search(
'ou=Groups,dc=accre,dc=vanderbilt,dc=edu',
'(cn={0})'.format(group),
attributes=['cn', 'gidNumber']
)
try:
gid = self.conn.entries[0].gidNumber.value
except IndexError:
return False
return self.conn.modify(
dn,
{'gidNumber': [(ldap3.MODIFY_REPLACE, [gid])]}
)
[docs] def list_users(self, active=False):
"""
Return a list of usernames of all cluster users currently
in the directory.
:param bool active: Only list active users if set to true,
where a user is defined as active if its shell is not
/bin/false or /sbin/nologin
:returns: All usernames of cluster users in the directory
:rtype: list(str)
"""
nologins = ['/bin/false', '/sbin/nologin']
self.conn.search(
'ou=People,dc=accre,dc=vanderbilt,dc=edu',
'(uid=*)',
attributes=['uid', 'loginShell']
)
if active:
return [
str(self.conn.entries[idx]['uid'])
for idx in range(len(self.conn.entries))
if self.conn.entries[idx]['loginShell'] not in nologins
]
else:
return [
str(self.conn.entries[idx]['uid'])
for idx in range(len(self.conn.entries))
]
[docs] def add_group(self, *, name, gid,
passwd='',
description='',
members=()
):
"""
Add a new posix group to the directory. The group name, and gid
are required. The password, description, and members will default
to being unset in the directory.
If an optional attribute is given in as the empty string, then
that attribute will not be set in the new LDAP entity, except
for the members attribute which will not be set if if is an
empty iterable.
:param str name: the name of the new group (required)
:param int gid: the numeric group ID
:param str passwd: the group password hash (usually unset)
:param list(str) members: member accounts of the group
:returns: True on success, False on failure
:rtype: bool
"""
if not members:
members = ''
props = {
"cn": name,
"gidNumber": gid,
"description": description,
"userPassword": passwd,
"memberUid": members
}
for key in list(props.keys()):
if props[key] == '':
del props[key]
dn = "cn={},ou=Groups,dc=accre,dc=vanderbilt,dc=edu".format(name)
return self.conn.add(
dn,
["top", "posixGroup"],
props
)
[docs] def add_local_group(self, name):
"""
Add a user group record from a local group specified in
/etc/group.
:param str name: Group name to import to LDAP from the local files
:returns: True on success, False on failure
:rtype: bool
"""
group = get_posixgroup(name)
if group.password and not UNSET_PASSWORD_RE.match(group.password):
password = '{{CRYPT}}{0}'.format(group.password)
else:
password = ''
return self.add_group(
name=group.name,
passwd=password,
gid=group.gid,
members=group.members
)
[docs] def delete_group(self, name):
"""
Remove the specified posix group from the directory.
:param str name: the group name to be removed
:returns: True on success, False on failure
:rtype: bool
"""
dn = "cn={},ou=Groups,dc=accre,dc=vanderbilt,dc=edu".format(name)
return self.conn.delete(dn)
[docs] def list_groups(self):
"""
Return a list of group names of all posix groups currently
in the directory.
:returns: All names of posix groups in the directory
:rtype: list(str)
"""
self.conn.search(
'ou=Groups,dc=accre,dc=vanderbilt,dc=edu',
'(cn=*)',
attributes=['cn']
)
return [
str(self.conn.entries[idx]['cn'])
for idx in range(len(self.conn.entries))
]
[docs] def list_group_gids(self):
"""
Return a list of group names and corresponding numeric gids as
tuples.
:returns: All names and gids of posix groups in the directory
:rtype: list(tuple(str, int))
"""
self.conn.search(
'ou=Groups,dc=accre,dc=vanderbilt,dc=edu',
'(cn=*)',
attributes=['cn', 'gidNumber']
)
return [
(entry['cn'].values[0], entry['gidNumber'].values[0])
for entry in self.conn.entries
]
[docs] def group_membership(self):
"""
Return a dict keyed by group names with a list of members as
the value for each group.
:returns: Membership of all groups
:rtype: dict(str,list(str))
"""
self.conn.search(
'ou=Groups,dc=accre,dc=vanderbilt,dc=edu',
'(cn=*)',
attributes=['cn','memberUid']
)
result = {}
for entry in self.conn.entries:
result[_get_str(entry, 'cn')] = entry['memberUid'].values
return result
[docs] def add_group_member(self, group, user):
"""
Make the specified group a secondary group for the specified user.
:param str group: group to add the user to
:param str user: user to add to the group
:returns: True on success
:rtype: bool
"""
return self.conn.modify(
'cn={0},ou=Groups,dc=accre,dc=vanderbilt,dc=edu'.format(group),
{'memberUid': [(ldap3.MODIFY_ADD, [user])]}
)
[docs] def remove_group_member(self, group, user):
"""
Remove the specified user from the specified group. Note: this method
does not work for the primary group of a user.
:param str group: group to remove the user from
:param str user: user to remove from the group
:returns: True on success
:rtype: bool
"""
return self.conn.modify(
'cn={0},ou=Groups,dc=accre,dc=vanderbilt,dc=edu'.format(group),
{'memberUid': [(ldap3.MODIFY_DELETE, [user])]}
)
[docs] def exists(self, accreid):
"""
Check if the given ACCRE ID exists on the server
:param str accreid: the ACCRE ID to be checked
:returns: True if the ACCRE ID exists
:rtype: bool
"""
try:
self._fetch_accreid(accreid)
return True
except ACCREValueError:
return False
[docs] def posixuser(self, accreid):
"""
Return a PosixUser named tuple with the user information
equivalent to an /etc/passwd entry for the given ACCRE ID
:param str accreid: the ACCRE ID to be checked
:returns: Posix user information for the user
:rtype: accre.util.PosixUser
"""
entry = self._fetch_accreid(accreid)
return PosixUser(
name=_get_str(entry, 'cn'),
password='x',
uid=int(_get_str(entry, 'uidNumber')),
gid=int(_get_str(entry, 'gidNumber')),
gecos=_get_str(entry, 'gecos'),
homedir=_get_str(entry, 'homeDirectory'),
shell=_get_str(entry, 'loginShell')
)
[docs] def shadowuser(self, accreid):
"""
Return a ShadowUser named tuple with the user information
equivalent to an /etc/shadow entry for the given ACCRE ID.
Note that only the most privileged LDAP users will be able
to access the actual password hash. Also note that inactive,
expire, and res are unused fields and will be set to empty
strings.
:param str accreid: the ACCRE ID to be checked
:returns: Shadow user information for the user
:rtype: accre.util.ShadowUser
"""
entry = self._fetch_accreid(accreid)
password = entry['userPassword'].raw_values[0].decode('ascii')
if password.startswith('{CRYPT}'):
password = password[7:]
return ShadowUser(
name=_get_str(entry, 'cn'),
password=password,
lastchange=_int_or_empty(_get_str(entry, 'shadowLastChange')),
min=_int_or_empty(_get_str(entry, 'shadowMin')),
max=_int_or_empty(_get_str(entry, 'shadowMax')),
warn=_int_or_empty(_get_str(entry, 'shadowWarning')),
inactive='',
expire='',
res=''
)
[docs] def posixgroup(self, accregroup):
"""
Return a PosixGroup named tuple with the group information
equivalent to an /etc/group entry for the given ACCRE group
:param str accregroup: the ACCRE group to be checked
:returns: Posix group information for the user
:rtype: accre.util.PosixGroup
"""
entry = self._fetch_accregroup(accregroup)
try:
members = tuple(entry['memberUid'].values)
except (
ldap3.core.exceptions.LDAPCursorError,
ldap3.core.exceptions.LDAPKeyError
):
members = ()
return PosixGroup(
name=_get_str(entry, 'cn'),
password='x',
gid=int(_get_str(entry, 'gidNumber')),
members=members
)
[docs] def group_exists(self, accregroup):
"""
Check if the given ACCRE group exists on the server
:param str accregroup: the ACCRE group name to be checked
:returns: True if the ACCRE group exists
:rtype: bool
"""
try:
self._fetch_accregroup(accregroup)
return True
except ACCREValueError:
return False
def _fetch_accreid(self, accreid):
"""
Fetch and return a accre user from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the accreid is not found on the server.
"""
return self._fetch_user(
accreid,
'ou=People,dc=accre,dc=vanderbilt,dc=edu'
)
def _fetch_accregroup(self, accregroup):
"""
Fetch and return an accre group from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the accre group is not found on the server.
"""
return self._fetch_group(
accregroup,
'ou=Groups,dc=accre,dc=vanderbilt,dc=edu'
)
def _fetch_robotid(self, robotid):
"""
Fetch and return an robot from the server.
Raise an exception if the robot is not found on the server.
Caching is not used for robots
"""
self.conn.search(
'ou=Robots,dc=accre,dc=vanderbilt,dc=edu',
'(uid={0})'.format(robotid),
attributes=['*']
)
if not self.conn.entries:
raise ACCREValueError(
'Robot {0} was not found on the server.'.format(robotid)
)
return self.conn.entries[0]
[docs] def robot_info(self, robotid):
"""
Return a dictionary of information about the specified
robot vunetid-equivalent including the
full name (fullname), uid (uid),
Vanderbilt-equivalent email (vanderbilt_email), and titles,
where titles indicate the purpose(s) of the robot and if it
is a locked account.
:param str robotid: the vunetid-equivalent to be checked
:returns: fullname, uid, vanderbilt_email, and titles
:rtype: dict
"""
entry = self._fetch_robotid(robotid)
result = {}
result['fullname'] = str(entry.cn)
result['uid'] = int(entry.uidNumber.value)
result['vanderbilt_email'] = str(entry.mail)
result['titles'] = entry.title.values
return result
[docs] def is_robot_locked(self, robotid):
"""
Return true if the robot is locked/inactive. A robot is
considered locked if it has "locked" in its titles.
:param str robotid: vunetid-equivalent for the robot
:returns: True if locked/inactive
:rtype: bool
"""
entry = self._fetch_robotid(robotid)
return 'locked' in entry.title.values
[docs] def set_robot_lock(self, cn, value):
"""
Set the robot account lock to True or False for the given
full or descriptive name of the account. This is equivelent
to a VUNetID being locked in the VUIT LDAP.
:param str cn: Full or descriptive name for the robot to
(un)lock
:param bool value: Locked (True) or unlocked (False)
:returns: True on success, False on failure
:rtype: bool
"""
dn = (
'cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu'.format(cn)
)
if value:
return self.conn.modify(
dn, {'title': [(ldap3.MODIFY_ADD, ['locked'])]}
)
else:
return self.conn.modify(
dn, {'title': [(ldap3.MODIFY_DELETE, ['locked'])]}
)
[docs] def robot_exists(self, robotid):
"""
Check if the given vunetid-equivalent exists on the server
:param str externalid: the vunetid-equivalent to be checked
:returns: True if the vunetid-equivalent exists
:rtype: bool
"""
try:
self._fetch_robotid(robotid)
return True
except ACCREValueError:
return False
[docs] def add_robot(self, *, cn, sn, vunetid, uid, mail, title,
testrobot=False,
legacy_uid=False
):
"""
Add a new robot to the directory. This record will be used as an
equivalent to VandyLDAP for verifying that the robot has a valid
"vunetid". A corresponding user record must be created to give
a robot access to the cluster.
The assigned vunetid must be in the valid robot range (9200-9999)
and must not conflict with an existing robot user, unless it
is a test user and in the test robot range (9100-9199)
:param str cn: The full name or descriptive name of the robot
:param str sn: The surname or shorter name of the robot
:param str vunetid: The vunetid-equivalent (posix username) for the
robot.
:param int uid: Numerical posix uid for the robot, must be in
the valid robot UID range (9100-9999)
:param str mail: Vanderilt-equivalent email address for the robot
:param str title: Title to indicate general purpose of the robot,
for example "testuser" for a test user
:param bool testrobot: If true, this robot is a test robot,
must be in the test robot UID range (9100-9199) and may
have a UID conflict
:param bool legacy_uid: If true, allow for a robot outside of the
valid robot UID range. The new robot UID must still not
conflict with other robots. Use with caution as this may
shadow actual VUNetID UIDs.
:returns: True on success, False on failure
:rtype: bool
"""
if not testrobot and not legacy_uid and not 9200 <= uid <= 9999:
raise ACCREValueError('Robot UID must be in the range 9200-9999')
if testrobot and not 9100 <= uid <= 9199:
raise ACCREValueError(
'Test bobot UID must be in the range 9100-9199'
)
if not testrobot:
self.conn.search(
'ou=Robots,dc=accre,dc=vanderbilt,dc=edu',
'(uid=*)',
attributes=['uid', 'uidNumber']
)
robots = {
e['uidNumber'].value: e['uid'].value
for e in self.conn.entries
}
if uid in robots:
raise ACCREValueError(
'UID {0} is already in use by {1}'
.format(uid, robots[uid])
)
props = {
"cn": cn,
"sn": sn,
"uid": vunetid,
"uidNumber": uid,
"gidNumber": 10000,
"homeDirectory": '/home/{}'.format(vunetid),
"mail": mail,
"title": title
}
dn = (
"cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu"
.format(cn)
)
return self.conn.add(
dn,
[
"top", "person", "inetOrgPerson",
"posixAccount", "organizationalPerson"
],
props
)
[docs] def delete_robot(self, cn):
"""
Remove the specified robot record from the directory.
:param str cn: the full/descriptive name of the robot to be removed
:returns: True on success, False on failure
:rtype: bool
"""
dn = (
"cn={},ou=Robots,dc=accre,dc=vanderbilt,dc=edu"
.format(cn)
)
return self.conn.delete(dn)
[docs] def list_robots(self, active=False):
"""
Return a list of usernames of all registered robots currently
in the directory.
:param bool active: Only list active robots if set to true,
where a robot is defined as active if it has a title of 'locked'
:returns: All usernames of robots in the directory
:rtype: list(str)
"""
self.conn.search(
'ou=Robots,dc=accre,dc=vanderbilt,dc=edu',
'(uid=*)',
attributes=['uid', 'title']
)
if active:
return [
str(self.conn.entries[idx]['uid'])
for idx in range(len(self.conn.entries))
if 'locked' not in self.conn.entries[idx]['title']
]
else:
return [
str(self.conn.entries[idx]['uid'])
for idx in range(len(self.conn.entries))
]
[docs]class VUDS(BaseLDAP):
"""
Client for VUDS (Vanderbilt Directory Services) using the ACCRE
resource ID with administrative privileges to the ACCRE_Users
group in DS.
Note that this client has also inherited the functionality of the
old "VandyLDAP" client which queried eLDAP. As eLDAP has now been
retired, VandyLDAP is now a reference to this class which provides
the equivalent API as well as possible but querying VUDS instead.
In order to improve performance when checking multiple VUNetID entries,
instantiate the client with caching=True to avoid repeatedly fetching
the same ID.
"""
# FOR DS (old shared AD)
#config_section = 'vuit-dsadmin'
config_section = 'vuit-vudsadmin'
users_ou = CONFIG[config_section]['accre_users_ou']
vunetid_re = re.compile(
r'CN=([^,]+),OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu'
)
[docs] def list_accre_users(self):
"""
Get a list of current accre user VUNetIDs from the ACCRE_Users
group in DS.
:returns: VUNetIDs of all users in the ACCRE_Users DS group
:rtype: list(str)
"""
self.conn.search(
self.users_ou,
'(CN=ACCRE_Users)',
attributes=['member']
)
dns = self.conn.entries[0]['member']
return [self.vunetid_re.search(dn).group(1) for dn in dns]
[docs] def add_accre_users(self, new_users):
"""
Add new user VUNetIDs from a list to the ACCRE_Users group
in DS. Note that a single invalid VUNetID will cause the whole
operation to fail, so adding one at a time may be
preferable.
:param list(str) new_users: List of new user VUNetIDs to add
:returns: True on operation success, False on failure
:rtype: bool
"""
new_dns = [
f'CN={v},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu' for v in new_users
]
return self.conn.modify(
f'CN=ACCRE_Users,{self.users_ou}',
{'member': [(ldap3.MODIFY_ADD, new_dns)]}
)
[docs] def delete_accre_users(self, del_users):
"""
Remove existing user VUNetIDs from a list to the ACCRE_Users group
in DS.
:param list(str) del_users: List of user VUNetIDs to delete
:returns: True on operation success, False on failure
:rtype: bool
"""
del_dns = [
f'CN={v},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu' for v in del_users
]
return self.conn.modify(
f'CN=ACCRE_Users,{self.users_ou}',
{'member': [(ldap3.MODIFY_DELETE, del_dns)]}
)
[docs] def is_active(self, vunetid):
"""
Check if the given vunetid is an active employee, student, or
affiliate of the university
.. deprecated::
This logic is no longer valid for VUNet affiliate accounts,
do not use
:param str vunetid: the vunetid to be checked
:returns: True if the vunetid is active
:rtype: bool
"""
try:
entry = self._fetch_vunetid(vunetid)
except ACCREValueError:
return False
employee = _get_str(entry, 'vanderbiltPersonActiveEmployee')
student = _get_str(entry, 'vanderbiltPersonActiveStudent')
affiliate = _get_str(entry, 'eduPersonAffiliation')
if employee == 'Y' or student == 'ACTIVE' or affiliate == 'affiliate':
return True
return False
[docs] def is_locked(self, vunetid):
"""
Returns False.
In the past, this method
checked if the given vunetid is locked due to becoming an inactive
VUnetID or having an expired VUNet password.
.. deprecated::
This logic is no longer valid following the eLDAP to VUDS
transition as there is no nsaccountlocked property,
do not use.
:param str vunetid: the vunetid to be checked
:returns: False
:rtype: bool
"""
return False
return self.conn.compare(
'uid={0},OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu'.format(vunetid),
'nsaccountlock',
True
)
[docs] def exists(self, vunetid):
"""
Check if the given vunetid exists on the server
:param str vunetid: the vunetid to be checked
:returns: True if the vunetid exists
:rtype: bool
"""
try:
self._fetch_vunetid(vunetid)
return True
except ACCREValueError:
return False
[docs] def exists_numeric(self, uid):
"""
Check if the given numeric UID corresponds to a vunetid on the server
:param str uid: the numeric UID to be checked
:returns: True if the UID exists
:rtype: bool
"""
try:
self._fetch_vunetid_numeric(uid)
return True
except ACCREValueError:
return False
[docs] def info(self, vunetid):
"""
Return a dictionary of information about the specified
vunetid including the full name (fullname), uid (uid)
and Vanderbilt email (vanderbilt_email).
:param str vunetid: the vunetid to be checked
:returns: fullname, uid, and vanderbilt_email
:rtype: dict
"""
entry = self._fetch_vunetid(vunetid)
result = {}
result['fullname'] = str(entry.displayName)
result['uid'] = int(entry.uidNumber.value)
# Some VandyLDAP accounts are missing the mail attribute
try:
result['vanderbilt_email'] = str(entry.mail)
except (
ldap3.core.exceptions.LDAPCursorError,
ldap3.core.exceptions.LDAPKeyError
):
result['vanderbilt_email'] = ''
return result
[docs] def list_groups_by_gid(self, gid):
"""
Lists all group names in VUDS corresponding to a given numeric
gid. In principle this should return a list of no more than one
group, but in the past there have been gid collisions.
:param int gid: the numeric gid to be checked
:returns: names of all groups with the numeric gid in vuds
:rtype: list(str)
"""
self.conn.search(
'DC=vuds,DC=vanderbilt,DC=edu',
f'(gidnumber={gid})',
attributes=['cn']
)
# although pathogical, a single entry could have more than one
# cn, hence the need to loop over entries to produce a flat list
result = []
for entry in self.conn.entries:
names = entry.entry_attributes_as_dict.get('cn', [])
result.extend(names)
return result
[docs] def info_numeric(self, uid):
"""
Return a dictionary of information about the specified
vunetid including the full name (fullname), uid (uid)
and Vanderbilt email (vanderbilt_email).
:param str uid: the numeric UID to be checked
:returns: fullname, uid, and vanderbilt_email
:rtype: dict
"""
entry = self._fetch_vunetid_numeric(uid)
record = entry.entry_attributes_as_dict
result = {}
# FOR DS (old shared AD)
# result['uid'] = str(entry.cn.value)
result['uid'] = str(record.get("uid", ["(NONE)"])[0])
result['fullname'] = str(record.get("displayName", ["(NONE)"])[0])
result['vanderbilt_email'] = str(record.get("mail", ["(NONE)"])[0])
result['org'] = str(record.get("o", ["(NONE)"])[0])
result['givenName'] = str(record.get("givenName", ["(NONE)"])[0])
result['middleName'] = str(record.get("middleName", ["(NONE)"])[0])
result['sn'] = str(record.get("sn", ["(NONE)"])[0])
return result
def _fetch_vunetid(self, vunetid):
"""
Fetch and return a vunetid from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the vunetid is not found on the server.
"""
return self._fetch_user(vunetid, 'OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu')
def _fetch_vunetid_numeric(self, uid):
"""
Fetch and return a vunetid from the server. If caching is true
and it is in the cache, return the cached value. Raise an exception
if the vunetid is not found on the server.
"""
# FOR DS (old shared AD)
# return self._fetch_num_uid(uid, 'CN=Users,DC=ds,DC=vanderbilt,DC=edu')
return self._fetch_num_uid(uid, 'OU=Users,OU=Accounts,DC=vuds,DC=vanderbilt,DC=edu')
# For script compatibility, the old VandyLDAP class is now a reference
# to VUDS which has a superset of the old VandyLDAP API. VandyLDAP
# was removed when the old eLDAP service was retired by VUIT
VandyLDAP = VUDS
def _get_str(entry, attr):
"""
Safely retrieve an attribute from an ldap3.abstract.entry.Entry
as a string, or return the empty string if it doesn't exist
You can't just do a getattr with a default due to the way ldap3 Entry
objects implement the dunder methods.
"""
try:
return str(getattr(entry, attr))
except (
ldap3.core.exceptions.LDAPCursorError,
ldap3.core.exceptions.LDAPKeyError
):
return ''
def _int_or_empty(value):
"""
Return an integer from the given value, or the empty string if
value is an empty string. Raise an exception otherwise
"""
try:
return int(value)
except ValueError:
if value == '':
return value
raise ACCREValueError(
'Value {0} is not an empty string or a valid int.'.format(value)
)
[docs]def vandy_cli():
"""
CLI entry point for Vanderbilt VUDS LDAP access
Run ``vandyldap --help`` for usage
"""
description = 'Commands for accessing Vanderbilt LDAP'
parser = accre_argparser('vandyldap', description=description)
subparsers = parser.add_subparsers()
parser_check = subparsers.add_parser(
'check',
help="Check the status and get information about a vunetid"
)
parser_check.add_argument('vunetid', type=str)
parser_check.set_defaults(func=_vandy_cli_check)
parser_checkuid = subparsers.add_parser(
'checkuid',
help="Check the status and get information about a vunetid by looking up the numeric UID"
)
parser_checkuid.add_argument('num_uid', type=str)
parser_checkuid.set_defaults(func=_vandy_cli_checkuid)
parser_batchuid = subparsers.add_parser(
'batchuid',
help="Take file with one UID number per line and generate a TSV file with info about each UID"
)
parser_batchuid.add_argument('uid_list_file', type=str)
parser_batchuid.set_defaults(func=_vandy_cli_batchuid)
args = parser.parse_args()
if hasattr(args, 'func') and args.func:
args.func(args)
else:
parser.print_help()
def _vandy_cli_check(args):
"""
Print out information about the specified VUNetID.
If the VUNetID does not exist on the LDAP server, print an
error message and return exit code 1.
"""
with VUDS(caching=True) as client:
if not client.exists(args.vunetid):
print('The VUNetID {0} does not exist'.format(args.vunetid))
sys.exit(1)
if client.is_active(args.vunetid):
active = 'Yes'
else:
active = 'No'
info = client.info(args.vunetid)
print('Full Name: {0}'.format(info['fullname']))
print('Active: {0}'.format(active))
print('UID: {0}'.format(info['uid']))
print('Vanderbilt Email: {0}'.format(info['vanderbilt_email']))
sys.exit(0)
def _vandy_cli_checkuid(args):
"""
Print out information about the VUNetID corresponding to the provided numeric UID.
If the VUNetID does not exist on the LDAP server, print an
error message and return exit code 1.
"""
with VUDS(caching=True) as client:
if not client.exists_numeric(args.num_uid):
print('The (numeric) UID {0} does not exist'.format(args.num_uid))
sys.exit(1)
info = client.info_numeric(args.num_uid)
print('Full Name: {0}'.format(info['fullname']))
print('UID: {0}'.format(info['uid']))
print('Vanderbilt Email: {0}'.format(info['vanderbilt_email']))
sys.exit(0)
def _vandy_cli_batchuid(args):
"""
Take file with one UID number per line and generate a TSV file with info about each UID
"""
print('\t'.join(['uidNumber','uid','o(org)','givenName','middleName','sn','mail','displayName']))
with VUDS(caching=True) as client:
with open(args.uid_list_file) as uidfile:
for line in uidfile:
num_uid = int(line)
if not client.exists_numeric(num_uid):
print('{0}\t{1}'.format(num_uid,'NOT_FOUND'))
continue
info = client.info_numeric(num_uid)
print('\t'.join([str(num_uid),info['uid'],info['org'],info['givenName'],info['middleName'],info['sn'],info['vanderbilt_email'],info['fullname']]))
sys.exit(0)
[docs]def accre_cli():
"""
CLI entry point for Vanderbilt LDAP access
Run ``accreldap --help`` for usage
"""
description = 'Commands for accessing ACCRE LDAP'
parser = accre_argparser('accreldap', description=description)
subparsers = parser.add_subparsers()
parser_user = subparsers.add_parser(
'user',
help="Check the status of or get information about an ACCRE user"
)
parser_user.add_argument('username', type=str)
parser_user.add_argument('--change-group',
action='store',
default=None,
help="New primary group to set the user to"
)
parser_user.add_argument('--add-group',
action='store',
default=None,
help="Secondary group (or comma delimited list of groups) to add the user to"
)
parser_user.add_argument('--remove-group',
action='store',
default=None,
help="Secondary group (or comma delimited list of groups) to remove the user from"
)
parser_user.set_defaults(func=_accre_cli_user)
args = parser.parse_args()
if hasattr(args, 'func') and args.func:
args.func(args)
else:
parser.print_help()
def _accre_cli_user(args):
"""
Print out information about the specified username, and optionally
make changes to the user.
"""
user = args.username
rootdn = False
if any([args.add_group, args.remove_group, args.change_group]):
rootdn=True
with ACCRELDAP(rootdn=rootdn) as client:
if not client.exists(user):
print(RedStr('Error: User {0} does not exist'.format(user)))
sys.exit(1)
if args.change_group:
if not client.group_exists(args.change_group):
print(RedStr('Error: invalid primary group'))
sys.exit(1)
result = client.change_user_group(user, args.change_group)
if not result:
print(RedStr('Error: failed to change group'))
sys.exit(1)
print(GreenStr('Successfully changed primary group'))
if args.add_group:
groups = args.add_group.split(',')
if not all(client.group_exists(g) for g in groups):
print(RedStr('Error: invalid secondary group'))
sys.exit(1)
for group in groups:
client.add_group_member(group, user)
print(GreenStr('Added {0} to group {1}'.format(
user, group)))
if args.remove_group:
groups = args.remove_group.split(',')
if not all(client.group_exists(g) for g in groups):
print(RedStr('Error: invalid secondary group'))
sys.exit(1)
for group in groups:
client.remove_group_member(group, user)
print(GreenStr('Removed {0} from group {1}'.format(
user, group)))
record = client.user_record(user)
_print_user_record(record)
sys.exit(0)
def _print_user_record(record):
precord = copy.deepcopy(record)
del precord['attributes']['objectClass']
if 'userPassword' in precord['attributes']:
del precord['attributes']['userPassword']
print('dn: {0}'.format(precord['dn']))
for attr in precord['attributes']:
print(' {0}: {1}'.format(
attr, ', '.join(str(x) for x in precord['attributes'][attr])
))
# File format hints for vim, if the modeline option is enabled (:set modeline=on) then 'modelines'
# number of lines (def:5) are searched at the start and end of file and settings found are adopted.
# Format: convert tabs to 4 spaces
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab