lumi2/lumi2/ldap.py

644 lines
20 KiB
Python

"""This module is an API used to interact with an OpenLDAP server.
Interactions include setting up authenticated connections, querying the DIT and
creating/reading/updating/deleting DIT entries.
Functions within this module rely heavily on the ldap3 module:
https://ldap3.readthedocs.io/en/latest/
"""
from string import ascii_lowercase, ascii_uppercase, digits
import json
from base64 import b64decode, b64encode
from io import BytesIO
from flask import current_app
from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef
from PIL import Image
from lumi2.usermodel import User
from lumi2.exceptions import MissingConfigKeyError
class InvalidStringFormatException(Exception):
"""Exception raised when an invalid string format is encountered."""
pass
class InvalidConnectionException(Exception):
"""Exception raised when an invalid Connection is encountered."""
pass
class EntryExistsException(Exception):
"""Exception raised when an entry unexpectedly already exists."""
pass
class EntryNotFoundException(Exception):
"""Exception raised when an entry is unexpectedly not found."""
pass
class AttributeNotFoundException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
class InvalidAttributeException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
def _assert_is_valid_base_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP base DN.
A valid base DN is a string of the format 'dc=<SLD>,dc=<TLD>', whereby:
<SLD> (second level domain) is a substring containing only lowercase
alphanumeric characters with minimum length 1.
<TLD> (top level domain) is a substring containing only lowercase latin
characters with minimum length 2.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP base DN.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 2:
raise InvalidStringFormatException("Expected exactly one ',' character.")
for token in tokens:
if not token.startswith("dc="):
raise InvalidStringFormatException(
"Expected DN component to start with 'dc='."
)
token_sld = tokens[0][3:]
token_tld = tokens[1][3:]
if not len(token_sld):
raise InvalidStringFormatException(
"Expected at least one character in second level domain."
)
if not len(token_tld) >= 2:
raise InvalidStringFormatException(
"Expected at least two characters in top level domain."
)
for char in token_sld:
if char not in ascii_lowercase + digits:
raise InvalidStringFormatException(
f"Expected only lowercase alphanumeric characters in second " \
f"level domain but got: '{char}'."
)
for char in token_tld:
if char not in ascii_lowercase:
raise InvalidStringFormatException(
f"Expected only lowercase ascii characters in top level " \
f"domain but got: '{char}'."
)
def _assert_is_valid_ou_dn(input_str) -> None:
"""Checks whether the input string is a valid organizational unit DN.
A valid ou DN is a string of the format 'ou=<NAME>,<BASE_DN>', whereby:
<NAME> (name of organizational unit) is a substring containing only
alphanumeric characters with minimum length 1.
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP ou DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 3:
raise InvalidStringFormatException("Expected exactly three ',' characters.")
if not tokens[0].startswith("ou="):
raise InvalidStringFormatException("Expected 'ou='.")
token_ou = tokens[0][3:]
if not len(token_ou):
raise InvalidStringFormatException("OU name cannot be empty.")
token_base_dn = tokens[1] + "," + tokens[2]
_assert_is_valid_base_dn(token_base_dn)
for char in token_ou:
if not char in ascii_lowercase + ascii_uppercase + digits:
raise InvalidStringFormatException(
f"Expected only alphanumeric characters in organizational unit " \
f"but got: '{char}'."
)
def _assert_is_valid_user_dn(input_str) -> None:
"""Checks whether the input string is a valid user entry DN.
A valid user DN is a string of the following format:
'uid=<USERNAME>,ou=<USERS_OU>,<BASE_DN>', whereby:
<USERNAME> (user's uid) is a substring containing only characters from the
character class [A-Za-z0-9_-.], it has minimum length 1 and must start with a
letter.
<USERS_OU> (RDN of ou holding users) must be a valid ou name as expected by
_assert_is_valid_ou_dn().
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP user DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 4:
raise InvalidStringFormatException("Expected exactly four ',' characters.")
if not tokens[0].startswith("uid="):
raise InvalidStringFormatException("Expected 'uid='.")
token_uid = tokens[0][4:]
if not len(token_uid):
raise InvalidStringFormatException("UID cannot be empty.")
valid_uid_chars = ascii_lowercase + ascii_uppercase + digits + "_-."
for char in token_uid:
if not char in valid_uid_chars:
raise InvalidStringFormatException(f"Invalid character in uid: '{char}'.")
if not token_uid[0] in ascii_lowercase + ascii_uppercase:
raise InvalidStringFormatException("UID must start with a letter character.")
token_base_dn = tokens[2] + "," + tokens[3]
_assert_is_valid_base_dn(token_base_dn)
token_ou_dn = tokens[1] + "," + token_base_dn
_assert_is_valid_ou_dn(token_ou_dn)
def _assert_is_valid_group_dn(input_str) -> None:
"""Checks whether the input string is a valid group entry DN.
A valid group DN is a string of the following format:
'cn=<GROUPNAME>,ou=<GROUPS_OU>,<BASE_DN>', whereby:
<GROUPNAME> (group name) is a substring containing only characters from the
character class [A-Za-z], and it has minimum length 1.
<GROUPS_OU> (RDN of ou holding groups) must be a valid ou name as expected by
_assert_is_valid_ou_dn().
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid LDAP group DN string.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 4:
raise InvalidStringFormatException("Expected exactly four ',' characters.")
if not tokens[0].startswith("cn="):
raise InvalidStringFormatException("Expected 'cn='.")
token_cn = tokens[0][3:]
if not len(token_cn):
raise InvalidStringFormatException("UID cannot be empty.")
for char in token_cn:
if not char in ascii_lowercase + ascii_uppercase:
raise InvalidStringFormatException(f"Invalid character in group cn: '{char}'.")
token_base_dn = tokens[2] + "," + tokens[3]
_assert_is_valid_base_dn(token_base_dn)
token_ou_dn = tokens[1] + "," + token_base_dn
_assert_is_valid_ou_dn(token_ou_dn)
def _assert_is_valid_user_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP user object class.
The only valid user object class is currently 'inetOrgPerson'.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not 'inetOrgPerson'.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
if not input_str == 'inetOrgPerson':
raise InvalidStringFormatException(
f"Expected 'inetOrgPerson' but got: '{input_str}'."
)
def _assert_is_valid_group_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP group object class.
The only valid group object class is currently 'groupOfUniqueNames'.
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not 'groupOfUniqueNames'.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
if not input_str == 'groupOfUniqueNames':
raise InvalidStringFormatException(
f"Expected 'groupOfUniqueNames' but got: '{input_str}'."
)
def _assert_is_valid_bind_user_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP bind user DN.
A valid bind user DN is a string of the format 'cn=<USERNAME>,<BASE_DN>',
whereby:
<USERNAME> (name of bind user) is a substring containing only
alphanumeric characters with minimum length 1.
<BASE_DN> is a substring representing an LDAP base DN as expected by
_assert_is_valid_base_dn().
Parameters
----------
input_str : string
The input string to check.
Returns
-------
None
Raises
------
TypeError
If input_str is not of type string.
InvalidStringFormatException
If input_str is not a valid bind user DN.
"""
if not isinstance(input_str, str):
raise TypeError(f"Expected a string but got: {type(input_str)}.")
tokens = input_str.split(',')
if len(tokens) != 3:
raise InvalidStringFormatException("Expected exactly two ',' characters.")
if not tokens[0].startswith("cn="):
raise InvalidStringFormatException("Expected 'cn='.")
token_cn = tokens[0][:3]
token_base_dn = tokens[1] + "," + tokens[2]
_assert_is_valid_base_dn(token_base_dn)
for char in token_cn:
if not char in ascii_lowercase + ascii_uppercase + digits:
raise InvalidStringFormatException(
f"Expected only alphanumeric characters in common name " \
f"but got: '{char}'."
)
def _assert_app_config_is_valid() -> None:
"""Checks the app config's LDAP settings for completeness and correctness.
Ensures that the following config keys are present:
- 'LDAP_HOSTNAME'
- 'LDAP_BIND_USER_PASSWORD'
Ensures that the following config keys are present and contain valid values:
- 'LDAP_BIND_USER_DN'
- 'LDAP_BASE_DN'
- 'LDAP_USERS_OU'
- 'LDAP_GROUPS_OU'
- 'LDAP_USER_OBJECT_CLASS'
- 'LDAP_GROUP_OBJECT_CLASS'
Returns
-------
None
Raises
------
TypeError
If any of the LDAP config values are not of type string.
MissingConfigKeyError
If any of the above-mentioned LDAP config keys are not declared.
InvalidStringFormatException
If any of the above-mentioned LDAP config values are in an invalid format.
"""
required_keys = [
'LDAP_HOSTNAME',
'LDAP_BIND_USER_DN',
'LDAP_BIND_USER_PASSWORD',
'LDAP_BASE_DN',
'LDAP_USERS_OU',
'LDAP_GROUPS_OU',
'LDAP_USER_OBJECT_CLASS',
'LDAP_GROUP_OBJECT_CLASS',
]
for key in required_keys:
if key not in current_app.config:
raise MissingConfigKeyError(
f"App config: key '{key}' was not provided."
)
if not isinstance(current_app.config[key], str):
raise TypeError(
f"Expected value of app config key '{key}' to be of type string."
)
_assert_is_valid_base_dn(current_app.config['LDAP_BASE_DN'])
_assert_is_valid_bind_user_dn(current_app.config['LDAP_BIND_USER_DN'])
for base in ['LDAP_USERS_OU', 'LDAP_GROUPS_OU']:
_assert_is_valid_ou_dn(current_app.config[base])
_assert_is_valid_user_object_class(current_app.config['LDAP_USER_OBJECT_CLASS'])
_assert_is_valid_group_object_class(current_app.config['LDAP_GROUP_OBJECT_CLASS'])
def get_connection() -> Connection:
"""Returns a Connection to the LDAP server specified in the app config.
A connection attempt is made to the LDAP server at the specified hostname
using the specified credentials.
Returns
-------
ldap3.Connection
A bound (i.e. authenticated), active Connection object to the server.
Raises
------
ldap3.core.exceptions.LDAPSocketOpenError
If the server specified by the hostname cannot be reached.
ldap3.core.exceptions.LDAPBindError
If the bind credentials (user DN and/or password) are invalid.
"""
hostname = current_app.config['LDAP_HOSTNAME']
user = current_app.config['LDAP_BIND_USER_DN']
password = current_app.config['LDAP_BIND_USER_PASSWORD']
return Connection(
Server(hostname, get_info=ALL),
user=user,
password=password,
auto_bind=True,
)
def _assert_is_valid_connection(connection: Connection) -> None:
"""Ensures that the connection is valid, open and bound.
Parameters
----------
Connection : ldap3.Connection
Connection object to an LDAP server.
Raises
------
TypeError
If connection is not of type ldap3.Connection
InvalidConnectionException
If the connection is not bound.
If the connection is closed.
"""
if not isinstance(connection, Connection):
raise TypeError(f"Expected a Connection object but got '{type(connection)}'.")
if not connection.open:
raise InvalidConnectionException("Connection is not open.")
if not connection.bound:
raise InvalidConnectionException("Connection is not bound.")
def ou_exists(connection: Connection, ou_dn: str) -> bool:
"""Checks whether the specified ou entry currently exists in the DIT.
The DN must be a direct child of the DIT's base DN.
The check is carried out by querying the LDAP server for the existence of
an entry at the specified DN.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Returns
-------
Bool
True if the specified ou exists in the DIT, and False otherwise.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
ou_name = ou_dn.split(',')[0][3:]
ou_objectdef = ObjectDef('organizationalUnit', connection)
reader = Reader(connection, ou_objectdef, current_app.config['LDAP_BASE_DN'])
search_results = reader.search()
for result in search_results:
if result.ou == ou_name:
return True
return False
def create_ou(connection: Connection, ou_dn: str) -> None:
"""Creates an entry for the specified organizational unit.
The ou entry must be a direct child of the DIT's base DN.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Raises
------
EntryExistsException
If an entry for the specified ou already exists in the DIT.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
if ou_exists(connection, ou_dn):
raise EntryExistsException(f"Cannot create '{ou_dn}': entry already exists.")
connection.add(ou_dn, 'organizationalUnit')
def get_user(connection: Connection, uid: str) -> User:
"""Retrieves the user with the specified uid (username) from the LDAP server.
The DIT is queried for an entry of class 'inetOrgPerson' directly under the
users ou specified in the app config.
The retrieved attributes are parsed and used to create a User object.
The follwing attributes are expected to be set for the entry:
'cn', 'sn', 'displayName', 'uid', 'userPassword', 'jpegPhoto', 'mail'.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
uid : str
Username of the user to be retrieved.
Returns
-------
lumi2.usermodel.User
The User object representing the retrieved user.
Raises
------
EntryNotFoundException
If no user with the specified uid is found.
AttributeNotFoundException
If one of the above-mentioned attributes for the user is unset.
TypeError
If uid is not of type string.
InvalidAttributeException
If an attribute's value is in an invalid format.
"""
_assert_is_valid_connection(connection)
if not isinstance(uid, str):
raise TypeError(f"Expected a string but got: '{type(uid)}'.")
user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU']
required_attributes = [
'cn',
'sn',
'displayName',
'uid',
'userPassword',
'jpegPhoto',
'mail',
]
connection.search(
user_dn, '(objectclass=inetOrgPerson)', attributes=required_attributes,
)
if not connection.entries:
raise EntryNotFoundException(f"No such user found: '{user_dn}'.")
entry = connection.entries[0]
# Convert entry to JSON and load attributes into a dict
attributes = json.loads(entry.entry_to_json())['attributes']
for attribute in required_attributes:
if attribute not in attributes.keys():
raise AttributeNotFoundException(
f"Attribute '{attribute}' not set in entry '{user_dn}'."
)
username = attributes['uid'][0]
email = attributes['mail'][0]
first_name = attributes['cn'][0]
last_name = attributes['sn'][0]
display_name = attributes['displayName'][0]
# Retrieve base64-encoded password hash prefixed with '{SHA512}'
password_hash = attributes['userPassword'][0]
expected_hash_type = '{SHA512}'
if not password_hash.startswith(expected_hash_type):
raise InvalidAttributeException(
f"Unexpected password hash in entry '{user_dn}': expected " \
f"'{expected_hash_type}' but got: '{password_hash}'."
)
# Strip prefix
password_hash = password_hash[len(expected_hash_type):]
picture_encoded = attributes['jpegPhoto'][0]['encoded']
# Decode base64-encoded picture and create a PIL Image object
picture = Image.open(BytesIO(b64decode(picture_encoded)))
return User(
username, password_hash, email,
first_name, last_name, display_name,
picture
)