diff --git a/lumi2/ldap.py b/lumi2/ldap.py index 354139e..a1fd919 100644 --- a/lumi2/ldap.py +++ b/lumi2/ldap.py @@ -924,6 +924,52 @@ def delete_group(connection: Connection, group_cn: str) -> None: connection.delete(group_dn) +def update_group(connection: Connection, group: Group) -> None: + """Updates the specified Group on the LDAP server. + + Parameters + ---------- + connection : ldap3.Connection + Bound Connection object to an LDAP server. + group : lumi2.usermodel.Group + The Group for which the LDAP entry is to be updated on the server. + + Raises + ------ + TypeError + If group is not of type Group. + EntryNotFoundException + If the specified group does not exist in the DIT, or if a user who is a + member of the Group does not exist in the DIT. + """ + + _assert_is_valid_connection(connection) + if not isinstance(group, Group): + raise TypeError(f"Expected a lumi2.usermodel.Group but got: '{type(group)}'.") + + if not group_exists(connection, group.get_dn()): + raise EntryNotFoundException( + f"Failed to update group '{group.groupname}': no such entry found." + ) + + member_dn_list = [] + for user in group.members: + user_dn = user.get_dn() + if not user_exists(connection, user_dn): + raise EntryNotFoundException( + f"Failed to create group '{group.groupname}': no entry found for " \ + f"user '{user.username}'." + ) + member_dn_list.append(user_dn) + + connection.modify( + group.get_dn(), + { + "uniqueMember": [(MODIFY_REPLACE, member_dn_list)], + } + ) + + def get_group(connection: Connection, group_cn: str) -> Group: """Retrieves the group with the specified CN (common name) from the LDAP server. diff --git a/tests/test_ldap.py b/tests/test_ldap.py index 5f9f98c..37046cb 100644 --- a/tests/test_ldap.py +++ b/tests/test_ldap.py @@ -322,6 +322,23 @@ def test_delete_group(app, connection): ll.delete_group(connection, "employees") +def test_update_group(app, connection): + with app.app_context(): + employees = ll.get_group(connection, "employees") + user0 = lu.User( + username="user0", + password_hash=lu.User.generate_password_hash("password"), + email="valid@example.com", + first_name="Test", + last_name="User", + ) + employees.members.add(user0) + with pytest.raises(ll.EntryNotFoundException): + ll.update_group(connection, employees) + ll.create_user(connection, user0) + ll.update_group(connection, employees) + assert user0 in ll.get_group(connection, "employees").members + def test_get_group(app, connection): with app.app_context():