From 1cbf2ce99b6a9d364ecc58358758f26287239b28 Mon Sep 17 00:00:00 2001 From: Julian Lobbes Date: Thu, 10 Nov 2022 00:15:23 +0100 Subject: [PATCH] feat(ldap): add get_user function --- lumi2/ldap.py | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/lumi2/ldap.py b/lumi2/ldap.py index 44bf336..b18c38d 100644 --- a/lumi2/ldap.py +++ b/lumi2/ldap.py @@ -8,10 +8,15 @@ https://ldap3.readthedocs.io/en/latest/ """ from string import ascii_lowercase, ascii_uppercase, digits +import json +from base64 import b64decode, b64encode +from io import BytesIO from flask import current_app from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef +from PIL import Image +from lumi2.usermodel import User from lumi2.exceptions import MissingConfigKeyError @@ -30,6 +35,20 @@ class EntryExistsException(Exception): pass +class EntryNotFoundException(Exception): + """Exception raised when an entry is unexpectedly not found.""" + pass + + +class AttributeNotFoundException(Exception): + """Exception raised when an entry's is unexpectedly not set.""" + pass + +class InvalidAttributeException(Exception): + """Exception raised when an entry's is unexpectedly not set.""" + pass + + def _assert_is_valid_base_dn(input_str) -> None: """Checks whether the input string is a valid LDAP base DN. @@ -532,3 +551,94 @@ def create_ou(connection: Connection, ou_dn: str) -> None: raise EntryExistsException(f"Cannot create '{ou_dn}': entry already exists.") connection.add(ou_dn, 'organizationalUnit') + + +def get_user(connection: Connection, uid: str) -> User: + """Retrieves the user with the specified uid (username) from the LDAP server. + + The DIT is queried for an entry of class 'inetOrgPerson' directly under the + users ou specified in the app config. + The retrieved attributes are parsed and used to create a User object. + The follwing attributes are expected to be set for the entry: + 'cn', 'sn', 'displayName', 'uid', 'userPassword', 'jpegPhoto', 'mail'. + + Parameters + ---------- + Connection : ldap3.Connection + Bound Connection object to an LDAP server. + uid : str + Username of the user to be retrieved. + + Returns + ------- + lumi2.usermodel.User + The User object representing the retrieved user. + + Raises + ------ + EntryNotFoundException + If no user with the specified uid is found. + AttributeNotFoundException + If one of the above-mentioned attributes for the user is unset. + TypeError + If uid is not of type string. + InvalidAttributeException + If an attribute's value is in an invalid format. + """ + + _assert_is_valid_connection(connection) + if not isinstance(uid, str): + raise TypeError(f"Expected a string but got: '{type(uid)}'.") + + user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU'] + required_attributes = [ + 'cn', + 'sn', + 'displayName', + 'uid', + 'password', + 'jpegPhoto', + 'mail', + ] + + connection.search( + user_dn, '(objectclass=inetOrgPerson)', attributes=required_attributes, + ) + if not connection.entries: + raise EntryNotFoundException(f"No such user found: '{user_dn}'.") + entry = connection.entries[0] + # Convert entry to JSON and load attributes into a dict + attributes = json.loads(entry.entry_to_json())['attributes'] + + for attribute in required_attributes: + if attribute not in attributes.keys(): + raise AttributeNotFoundException( + f"Attribute '{attribute}' not set in entry '{user_dn}'." + ) + + username = attributes['uid'][0] + email = attributes['mail'][0] + first_name = attributes['cn'][0] + last_name = attributes['sn'][0] + display_name = attributes['displayName'][0] + + # Retrieve base64-encoded password hash prefixed with '{SHA512}' + password_hash = attributes['userPassword'][0] + expected_hash_type = '{SHA512}' + if not password_hash.startswith(expected_hash_type): + raise InvalidAttributeException( + f"Unexpected password hash in entry '{user_dn}': expected " \ + f"'{expected_hash_type}' but got: '{password_hash}'." + ) + # Strip prefix + password_hash = password_hash[len(expected_hash_type):] + + picture_encoded = attributes['jpegPhoto'][0]['encoded'] + # Decode base64-encoded picture and create a PIL Image object + picture = Image.open(BytesIO(b64decode(picture_encoded))) + + return User( + username, password_hash, email, + first_name, last_name, display_name, + picture + )