lumi2/lumi2/ldap.py

1052 lines
32 KiB
Python

"""This module is an API used to interact with an OpenLDAP server.
Interactions include setting up authenticated connections, querying the DIT and
creating/reading/updating/deleting DIT entries.
Functions within this module rely heavily on the ldap3 module:
https://ldap3.readthedocs.io/en/latest/
"""
from string import ascii_lowercase, ascii_uppercase, digits
import json
from base64 import b64decode, b64encode
from io import BytesIO
from flask import current_app
from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef, MODIFY_REPLACE
from PIL import Image
from lumi2.usermodel import User, Group
from lumi2.exceptions import MissingConfigKeyError
class InvalidStringFormatException(Exception):
"""Exception raised when an invalid string format is encountered."""
pass
class InvalidConnectionException(Exception):
"""Exception raised when an invalid Connection is encountered."""
pass
class EntryExistsException(Exception):
"""Exception raised when an entry unexpectedly already exists."""
pass
class EntryNotFoundException(Exception):
"""Exception raised when an entry is unexpectedly not found."""
pass
class AttributeNotFoundException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
class InvalidAttributeException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
def _assert_is_valid_base_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP base DN.
A valid base DN is a string of the format 'dc=<SLD>,dc=<TLD>', whereby:
<SLD> (second level domain) is a substring containing only lowercase
alphanumeric characters with minimum length 1.
<TLD> (top level domain) is a substring containing only lowercase latin
characters with minimum length 2.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP base DN.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 2:
raise InvalidStringFormatException("Expected exactly one ',' character.")
for token in tokens:
if not token.startswith("dc="):
raise InvalidStringFormatException(
"Expected DN component to start with 'dc='."
)
token_sld = tokens[0][3:]
token_tld = tokens[1][3:]
if not len(token_sld):
raise InvalidStringFormatException(
"Expected at least one character in second level domain."
)
if not len(token_tld) >= 2:
raise InvalidStringFormatException(
"Expected at least two characters in top level domain."
)
for char in token_sld:
if char not in ascii_lowercase + digits:
raise InvalidStringFormatException(
f"Expected only lowercase alphanumeric characters in second " \
f"level domain but got: '{char}'."
)
for char in token_tld:
if char not in ascii_lowercase:
raise InvalidStringFormatException(
f"Expected only lowercase ascii characters in top level " \
f"domain but got: '{char}'."
)
def _assert_is_valid_ou_dn(input_str) -> None:
"""Checks whether the input string is a valid organizational unit DN.
A valid ou DN is a string of the format 'ou=<NAME>,<BASE_DN>', whereby:
<NAME> (name of organizational unit) is a substring containing only
alphanumeric characters with minimum length 1.
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP ou DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 3:
raise InvalidStringFormatException("Expected exactly three ',' characters.")
if not tokens[0].startswith("ou="):
raise InvalidStringFormatException("Expected 'ou='.")
token_ou = tokens[0][3:]
if not len(token_ou):
raise InvalidStringFormatException("OU name cannot be empty.")
token_base_dn = tokens[1] + "," + tokens[2]
_assert_is_valid_base_dn(token_base_dn)
for char in token_ou:
if not char in ascii_lowercase + ascii_uppercase + digits:
raise InvalidStringFormatException(
f"Expected only alphanumeric characters in organizational unit " \
f"but got: '{char}'."
)
def _assert_is_valid_user_dn(input_str) -> None:
"""Checks whether the input string is a valid user entry DN.
A valid user DN is a string of the following format:
'uid=<USERNAME>,ou=<USERS_OU>,<BASE_DN>', whereby:
<USERNAME> (user's uid) is a substring containing only characters from the
character class [A-Za-z0-9_-.], it has minimum length 1 and must start with a
letter.
<USERS_OU> (RDN of ou holding users) must be a valid ou name as expected by
_assert_is_valid_ou_dn().
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP user DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 4:
raise InvalidStringFormatException("Expected exactly four ',' characters.")
if not tokens[0].startswith("uid="):
raise InvalidStringFormatException("Expected 'uid='.")
token_uid = tokens[0][4:]
if not len(token_uid):
raise InvalidStringFormatException("UID cannot be empty.")
valid_uid_chars = ascii_lowercase + ascii_uppercase + digits + "_-."
for char in token_uid:
if not char in valid_uid_chars:
raise InvalidStringFormatException(f"Invalid character in uid: '{char}'.")
if not token_uid[0] in ascii_lowercase + ascii_uppercase:
raise InvalidStringFormatException("UID must start with a letter character.")
token_base_dn = tokens[2] + "," + tokens[3]
_assert_is_valid_base_dn(token_base_dn)
token_ou_dn = tokens[1] + "," + token_base_dn
_assert_is_valid_ou_dn(token_ou_dn)
def _assert_is_valid_group_dn(input_str) -> None:
"""Checks whether the input string is a valid group entry DN.
A valid group DN is a string of the following format:
'cn=<GROUPNAME>,ou=<GROUPS_OU>,<BASE_DN>', whereby:
<GROUPNAME> (group name) is a substring containing only characters from the
character class [A-Za-z], and it has minimum length 1.
<GROUPS_OU> (RDN of ou holding groups) must be a valid ou name as expected by
_assert_is_valid_ou_dn().
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP group DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 4:
raise InvalidStringFormatException("Expected exactly four ',' characters.")
if not tokens[0].startswith("cn="):
raise InvalidStringFormatException("Expected 'cn='.")
token_cn = tokens[0][3:]
if not len(token_cn):
raise InvalidStringFormatException("UID cannot be empty.")
for char in token_cn:
if not char in ascii_lowercase + ascii_uppercase:
raise InvalidStringFormatException(f"Invalid character in group cn: '{char}'.")
token_base_dn = tokens[2] + "," + tokens[3]
_assert_is_valid_base_dn(token_base_dn)
token_ou_dn = tokens[1] + "," + token_base_dn
_assert_is_valid_ou_dn(token_ou_dn)
def _assert_is_valid_user_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP user object class.
The only valid user object class is currently 'inetOrgPerson'.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not 'inetOrgPerson'.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
if not input_str == 'inetOrgPerson':
raise InvalidStringFormatException(
f"Expected 'inetOrgPerson' but got: '{input_str}'."
)
def _assert_is_valid_group_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP group object class.
The only valid group object class is currently 'groupOfUniqueNames'.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not 'groupOfUniqueNames'.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
if not input_str == 'groupOfUniqueNames':
raise InvalidStringFormatException(
f"Expected 'groupOfUniqueNames' but got: '{input_str}'."
)
def _assert_is_valid_bind_user_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP bind user DN.
A valid bind user DN is a string of the format 'cn=<USERNAME>,<BASE_DN>',
whereby:
<USERNAME> (name of bind user) is a substring containing only
alphanumeric characters with minimum length 1.
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid bind user DN.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 3:
raise InvalidStringFormatException("Expected exactly two ',' characters.")
if not tokens[0].startswith("cn="):
raise InvalidStringFormatException("Expected 'cn='.")
token_cn = tokens[0][:3]
token_base_dn = tokens[1] + "," + tokens[2]
_assert_is_valid_base_dn(token_base_dn)
for char in token_cn:
if not char in ascii_lowercase + ascii_uppercase + digits:
raise InvalidStringFormatException(
f"Expected only alphanumeric characters in common name " \
f"but got: '{char}'."
)
def _assert_app_config_is_valid() -> None:
"""Checks the app config's LDAP settings for completeness and correctness.
Ensures that the following config keys are present:
- 'LDAP_HOSTNAME'
- 'LDAP_BIND_USER_PASSWORD'
Ensures that the following config keys are present and contain valid values:
- 'LDAP_BIND_USER_DN'
- 'LDAP_BASE_DN'
- 'LDAP_USERS_OU'
- 'LDAP_GROUPS_OU'
- 'LDAP_USER_OBJECT_CLASS'
- 'LDAP_GROUP_OBJECT_CLASS'
Returns
-------
None
Raises
------
TypeError
If any of the LDAP config values are not of type string.
MissingConfigKeyError
If any of the above-mentioned LDAP config keys are not declared.
InvalidStringFormatException
If any of the above-mentioned LDAP config values are in an invalid format.
"""
required_keys = [
'LDAP_HOSTNAME',
'LDAP_BIND_USER_DN',
'LDAP_BIND_USER_PASSWORD',
'LDAP_BASE_DN',
'LDAP_USERS_OU',
'LDAP_GROUPS_OU',
'LDAP_USER_OBJECT_CLASS',
'LDAP_GROUP_OBJECT_CLASS',
]
for key in required_keys:
if key not in current_app.config:
raise MissingConfigKeyError(
f"App config: key '{key}' was not provided."
)
if not isinstance(current_app.config[key], str):
raise TypeError(
f"Expected value of app config key '{key}' to be of type string."
)
_assert_is_valid_base_dn(current_app.config['LDAP_BASE_DN'])
_assert_is_valid_bind_user_dn(current_app.config['LDAP_BIND_USER_DN'])
for base in ['LDAP_USERS_OU', 'LDAP_GROUPS_OU']:
_assert_is_valid_ou_dn(current_app.config[base])
_assert_is_valid_user_object_class(current_app.config['LDAP_USER_OBJECT_CLASS'])
_assert_is_valid_group_object_class(current_app.config['LDAP_GROUP_OBJECT_CLASS'])
def get_connection() -> Connection:
"""Returns a Connection to the LDAP server specified in the app config.
A connection attempt is made to the LDAP server at the specified hostname
using the specified credentials.
Returns
-------
ldap3.Connection
A bound (i.e. authenticated), active Connection object to the server.
Raises
------
ldap3.core.exceptions.LDAPSocketOpenError
If the server specified by the hostname cannot be reached.
ldap3.core.exceptions.LDAPBindError
If the bind credentials (user DN and/or password) are invalid.
"""
hostname = current_app.config['LDAP_HOSTNAME']
user = current_app.config['LDAP_BIND_USER_DN']
password = current_app.config['LDAP_BIND_USER_PASSWORD']
return Connection(
Server(hostname, get_info=ALL),
user=user,
password=password,
auto_bind=True,
)
def _assert_is_valid_connection(connection: Connection) -> None:
"""Ensures that the connection is valid, open and bound.
Parameters
----------
connection : ldap3.Connection
Connection object to an LDAP server.
Raises
------
TypeError
If connection is not of type ldap3.Connection
InvalidConnectionException
If the connection is not bound.
If the connection is closed.
"""
if not isinstance(connection, Connection):
raise TypeError(f"Expected a Connection object but got '{type(connection)}'.")
if not connection.open:
raise InvalidConnectionException("Connection is not open.")
if not connection.bound:
raise InvalidConnectionException("Connection is not bound.")
def ou_exists(connection: Connection, ou_dn: str) -> bool:
"""Checks whether the specified ou entry currently exists in the DIT.
The DN must be a direct child of the DIT's base DN.
The check is carried out by querying the LDAP server for the existence of
an entry at the specified DN.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Returns
-------
Bool
True if the specified ou exists in the DIT, and False otherwise.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
ou_name = ou_dn.split(',')[0][3:]
ou_objectdef = ObjectDef('organizationalUnit', connection)
reader = Reader(connection, ou_objectdef, current_app.config['LDAP_BASE_DN'])
search_results = reader.search()
for result in search_results:
if result.ou == ou_name:
return True
return False
def create_ou(connection: Connection, ou_dn: str) -> None:
"""Creates an entry for the specified organizational unit.
The ou entry must be a direct child of the DIT's base DN.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Raises
------
EntryExistsException
If an entry for the specified ou already exists in the DIT.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
if ou_exists(connection, ou_dn):
raise EntryExistsException(f"Cannot create '{ou_dn}': entry already exists.")
connection.add(ou_dn, 'organizationalUnit')
def user_exists(connection: Connection, user_dn: str) -> bool:
"""Checks whether the specified user entry exists in the DIT.
The user DN is expected to represent an entry which is a direct child of the
LDAP_USERS_OU specified in the app config.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
user_dn : str
DN of a user entry (uid) directly below the LDAP_USERS_OU entry.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_user_dn(user_dn)
connection.search(user_dn, '(objectclass=inetOrgPerson)')
return len(connection.entries) > 0
def get_user(connection: Connection, uid: str) -> User:
"""Retrieves the user with the specified uid (username) from the LDAP server.
The DIT is queried for an entry of class 'inetOrgPerson' directly under the
users ou specified in the app config.
The retrieved attributes are parsed and used to create a User object.
The follwing attributes are expected to be set for the entry:
'cn', 'sn', 'displayName', 'uid', 'userPassword', 'jpegPhoto', 'mail'.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
uid : str
Username of the user to be retrieved.
Returns
-------
lumi2.usermodel.User
The User object representing the retrieved user.
Raises
------
EntryNotFoundException
If no user with the specified uid is found.
AttributeNotFoundException
If one of the above-mentioned attributes for the user is unset.
TypeError
If uid is not of type string.
InvalidAttributeException
If an attribute's value is in an invalid format.
"""
_assert_is_valid_connection(connection)
if not isinstance(uid, str):
raise TypeError(f"Expected a string but got: '{type(uid)}'.")
user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU']
required_attributes = [
'cn',
'sn',
'displayName',
'uid',
'userPassword',
'jpegPhoto',
'mail',
]
connection.search(
user_dn, '(objectclass=inetOrgPerson)', attributes=required_attributes,
)
if not connection.entries:
raise EntryNotFoundException(f"No such user found: '{user_dn}'.")
entry = connection.entries[0]
# Convert entry to JSON and load attributes into a dict
attributes = json.loads(entry.entry_to_json())['attributes']
for attribute in required_attributes:
if attribute not in attributes.keys():
raise AttributeNotFoundException(
f"Attribute '{attribute}' not set in entry '{user_dn}'."
)
username = attributes['uid'][0]
email = attributes['mail'][0]
first_name = attributes['cn'][0]
last_name = attributes['sn'][0]
display_name = attributes['displayName'][0]
# Retrieve base64-encoded password hash prefixed with '{SHA512}'
password_hash = attributes['userPassword'][0]
expected_hash_type = '{SHA512}'
if not password_hash.startswith(expected_hash_type):
raise InvalidAttributeException(
f"Unexpected password hash in entry '{user_dn}': expected " \
f"'{expected_hash_type}' but got: '{password_hash}'."
)
# Strip prefix
password_hash = password_hash[len(expected_hash_type):]
picture_encoded = attributes['jpegPhoto'][0]['encoded']
# Decode base64-encoded picture and create a PIL Image object
picture = Image.open(BytesIO(b64decode(picture_encoded)))
return User(
username, password_hash, email,
first_name, last_name, display_name,
picture
)
def create_user(connection: Connection, user: User) -> None:
"""Creates an entry from the specified User object on the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
user : lumi2.usermodel.User
The User object from which a user LDAP entry will be created.
Raises
------
EntryExistsException
If a user entry with the same uid/username already exists in the DIT.
"""
_assert_is_valid_connection(connection)
if not isinstance(user, User):
raise TypeError(f"Expected a User but got: '{type(user)}'.")
try:
get_user(connection, user.username)
raise EntryExistsException("User already exists: '{user.username}'.")
except EntryNotFoundException:
pass
user_image_bytes = BytesIO()
user.picture.save(user_image_bytes, format="jpeg")
attributes = {
"uid": user.username,
"userPassword": "{SHA512}" + user.password_hash,
"cn": user.first_name,
"sn": user.last_name,
"displayName": user.display_name,
"mail": user.email,
"jpegPhoto": user_image_bytes.getvalue(),
}
connection.add(user.get_dn(), "inetOrgPerson", attributes)
def update_user(connection: Connection, user: User) -> None:
"""Updates the LDAP entry for the specified User.
All attributes of the user's entry on the LDAP server are overwritten using
the User object's instance attributes.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
user : lumi2.usermodel.User
User object whose LDAP entry will be updated.
Raises
------
TypeError
If user is not of type User.
EntryNotFoundException
If no entry for the specified user's uid (username) exists in the LDAP
server's DIT.
"""
_assert_is_valid_connection(connection)
if not isinstance(user, User):
raise TypeError(f"Expected a User but got: '{type(user)}'.")
user_dn = user.get_dn()
if not user_exists(connection, user_dn):
raise EntryNotFoundException(
f"Failed to update user '{user.username}': no entry exists for " \
f"'{user_dn}'."
)
new_picture_bytes = BytesIO()
user.picture.save(new_picture_bytes, format="jpeg")
new_attributes = {
"userPassword": [(MODIFY_REPLACE, ["{SHA512}" + user.password_hash])],
"mail": [(MODIFY_REPLACE, [user.email])],
"cn": [(MODIFY_REPLACE, [user.first_name])],
"sn": [(MODIFY_REPLACE, [user.last_name])],
"displayName": [(MODIFY_REPLACE, [user.display_name])],
"jpegPhoto": [(MODIFY_REPLACE, [new_picture_bytes.getvalue()])],
}
connection.modify(user.get_dn(), new_attributes)
def delete_user(connection: Connection, uid: str) -> None:
"""Deletes the user with the specified uid (username) from the LDAP server.
If any groups exists in which the specified User is the only member, the
respective Group is also silently deleted.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
uid : str
Username of the user to be deleted.
Raises
------
EntryNotFoundException
If no user with the specified uid is found.
TypeError
If uid is not of type string.
"""
_assert_is_valid_connection(connection)
if not isinstance(uid, str):
raise TypeError(f"Expected a string but got: '{type(uid)}'.")
user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU']
if not user_exists(connection, user_dn):
raise EntryNotFoundException(f"No such user entry: '{user_dn}'.")
# Check if user is the sole member of any groups
# If so, delete that group here, because groups need to have at least one member
groups_to_delete = set()
for group in get_groups_of_user(connection, get_user(connection, uid)):
if len(group.members) == 1:
groups_to_delete.add(group)
for group in groups_to_delete:
delete_group(connection, group.groupname)
connection.delete(user_dn)
def get_users(connection: Connection) -> set[User]:
"""Retrieves a set containing all Users from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
"""
_assert_is_valid_connection(connection)
connection.search(
current_app.config['LDAP_USERS_OU'],
"(objectclass=inetOrgPerson)",
attributes=['uid'],
)
all_users = set()
for entry in connection.entries:
all_users.add(get_user(connection, str(entry.uid)))
return all_users
def group_exists(connection: Connection, group_dn: str) -> bool:
"""Checks whether the specified group entry exists in the DIT.
The group DN is expected to represent an entry which is a direct child of the
LDAP_GROUPS_OU specified in the app config.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_dn : str
DN of a group entry (cn) directly below the LDAP_GROUPS_OU entry.
Raises
------
MissingParentEntryException
If the specified group DN's direct parent entry is not present in the DIT.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_group_dn(group_dn)
connection.search(group_dn, '(objectclass=groupOfUniqueNames)')
return len(connection.entries) > 0
def create_group(connection: Connection, group: Group) -> None:
"""Creates the specified Group on the LDAP server.
All members of the group must already exist as users in the DIT.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group : lumi2.usermodel.Group
The Group for which an entry is to be created on the LDAP server.
Raises
------
TypeError
If group is not of type Group.
EntryNotFoundException
If a user who is a member of the Group does not exist in the DIT.
EntryExistsException
If a group with this name already exists on the LDAP server.
"""
_assert_is_valid_connection(connection)
if not isinstance(group, Group):
raise TypeError(f"Expected a lumi2.usermodel.Group but got: '{type(group)}'.")
if group_exists(connection, group.get_dn()):
raise EntryExistsException(
f"Failed to create group '{group.groupname}': entry exists already."
)
member_dn_list = []
for user in group.members:
user_dn = user.get_dn()
if not user_exists(connection, user_dn):
raise EntryNotFoundException(
f"Failed to create group '{group.groupname}': no entry found for " \
f"user '{user.username}'."
)
member_dn_list.append(user_dn)
connection.add(
group.get_dn(),
"groupOfUniqueNames",
{"uniqueMember": member_dn_list},
)
def delete_group(connection: Connection, group_cn: str) -> None:
"""Deletes the group with the specified CN from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_cn : str
The CN (common name) of the Group whose entry will be removed from the
LDAP server's DIT.
Raises
------
TypeError
If group_cn is not of type str.
EntryNotFoundException
If no group with the specified CN exists in the DIT.
"""
_assert_is_valid_connection(connection)
if not isinstance(group_cn, str):
raise TypeError(f"Expected a string but got: '{type(group_cn)}'.")
group_dn = f"cn={group_cn},{current_app.config['LDAP_GROUPS_OU']}"
_assert_is_valid_group_dn(group_dn)
if not group_exists(connection, group_dn):
raise EntryNotFoundException(
f"Failed to delete group '{group_cn}': no such entry found: " \
f"'{group_dn}'."
)
connection.delete(group_dn)
def get_group(connection: Connection, group_cn: str) -> Group:
"""Retrieves the group with the specified CN (common name) from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_cn : str
CN (group name) of the group to be retrieved.
Returns
-------
lumi2.usermodel.Group
The Group object representing the retrieved Group.
Raises
------
EntryNotFoundException
If no group with the specified group_cn is found.
TypeError
If group_cn is not of type string.
"""
_assert_is_valid_connection(connection)
if not isinstance(group_cn, str):
raise TypeError(f"Expected a string but got: '{type(group_cn)}'.")
group_dn = f"cn={group_cn},{current_app.config['LDAP_GROUPS_OU']}"
_assert_is_valid_group_dn(group_dn)
required_attributes = [
"cn",
"uniqueMember",
]
connection.search(
group_dn, '(objectclass=groupOfUniqueNames)', attributes=required_attributes,
)
if not connection.entries:
raise EntryNotFoundException(f"No such group found: '{group_cn}'.")
entry = connection.entries[0]
# Convert entry to JSON and load attributes into a dict
attributes = json.loads(entry.entry_to_json())['attributes']
for attribute in required_attributes:
if attribute not in attributes.keys():
raise AttributeNotFoundException(
f"Attribute '{attribute}' not set in entry '{group_dn}'."
)
groupname = attributes['cn'][0]
member_dn_list = attributes['uniqueMember']
members = set()
for user_dn in member_dn_list:
user_uid = user_dn.split(',')[0][4:]
members.add(get_user(connection, user_uid))
return Group(groupname, members)
def get_groups(connection: Connection) -> set[Group]:
"""Retrieves a set containing all Groups from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
"""
_assert_is_valid_connection(connection)
connection.search(
current_app.config['LDAP_GROUPS_OU'],
"(objectclass=groupOfUniqueNames)",
attributes=['cn'],
)
all_groups = set()
for entry in connection.entries:
all_groups.add(get_group(connection, str(entry.cn)))
return all_groups
def get_groups_of_user(connection: Connection, user: User) -> set[Group]:
"""Retrieves the set of Groups of which the specified user is a member.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
user : lumi2.usermodel.User
The User whose Groups to retrieve.
Returns
-------
set[Groups]
A set containing all Groups of which the user is a member.
If the specified User is not present in any groups, returns an empty set.
Raises
------
EntryNotFoundException
If no entry for the specified User exists.
TypeError
If user is not of type User.
"""
_assert_is_valid_connection(connection)
if not user_exists(connection, user.get_dn()):
raise EntryNotFoundException(
f"Failed to get Groups of User '{user.username}': the entry " \
f"'{user.get_dn()}' does not exist."
)
all_groups = get_groups(connection)
result = set()
for group in all_groups:
if user in group.members:
result.add(group)
return result