lumi2/lumi2/usermanager.py

461 lines
12 KiB
Python

"""Views for lumi2."""
from pathlib import Path
import shutil
from tempfile import TemporaryFile
from json import loads, dumps, JSONDecodeError
from flask import (
Blueprint, render_template, abort, request, flash, redirect, url_for,
current_app, g, send_from_directory
)
from PIL import Image, UnidentifiedImageError
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed
from wtforms import (
ValidationError, StringField, PasswordField, SubmitField, HiddenField
)
from wtforms.validators import InputRequired, Email, EqualTo
from lumi2.auth import login_required
import lumi2.ldap as ldap
from lumi2.usermodel import User, Group
from lumi2.exceptions import InvalidStringFormatException, InvalidImageException
bp = Blueprint('usermanager', __name__)
@bp.before_app_first_request
def _init_static_images():
"""Purges and recreates the static images folder."""
path_to_image_cache = Path(current_app.instance_path) / "protected" / "images" / "users"
if path_to_image_cache.is_dir():
shutil.rmtree(path_to_image_cache)
path_to_image_cache.mkdir(parents=True)
conn = ldap.get_connection()
for user in ldap.get_users(conn):
user._generate_static_images()
conn.unbind()
@bp.before_app_first_request
def _initialize_ldap_dit():
"""Creates the OUs for users and groups if they do not exist yet."""
conn = ldap.get_connection()
if not ldap.ou_exists(conn, current_app.config['LDAP_USERS_OU']):
ldap.create_ou(conn, current_app.config['LDAP_USERS_OU'])
if not ldap.ou_exists(conn, current_app.config['LDAP_GROUPS_OU']):
ldap.create_ou(conn, current_app.config['LDAP_GROUPS_OU'])
conn.unbind()
@bp.route('/protected/<path:path_to_file>')
@login_required
def protected(path_to_file):
"""Returns the specified file only if the requesting client is logged in."""
return send_from_directory(
Path(current_app.instance_path) / "protected", path_to_file
)
@bp.route('/')
def index():
"""Home page view."""
if g.is_authenticated:
try:
conn = ldap.get_connection()
except Exception:
abort(500)
user_count = len(ldap.get_users(conn))
group_count = len(ldap.get_groups(conn))
conn.unbind()
return render_template(
'usermanager/index.html',
user_count=user_count,
group_count=group_count
)
return render_template('usermanager/index.html')
@bp.route("/users/view/<string:username>")
@login_required
def user_view(username: str):
"""Detail view for a specific User.
Shows the user's information.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
try:
user = ldap.get_user(conn, username)
except ldap.EntryNotFoundException:
conn.unbind()
abort(404)
conn.unbind()
return render_template('usermanager/user_view.html',user=user)
@bp.route("/users/list")
@login_required
def user_list():
"""Displays a list of all users."""
try:
conn = ldap.get_connection()
users = ldap.get_users(conn)
conn.unbind()
except Exception:
abort(500)
return render_template(
'usermanager/user_list.html',
users=users,
)
class UserUpdateForm(FlaskForm):
@staticmethod
def validate_name(form, field) -> None:
if field.data:
try:
User.assert_is_valid_name(field.data)
except InvalidStringFormatException as e:
raise ValidationError(str(e))
@staticmethod
def validate_password(form, field) -> None:
if field.data:
try:
User.assert_is_valid_password(field.data)
except InvalidStringFormatException as e:
raise ValidationError(str(e))
@staticmethod
def validate_picture(form, field) -> None:
if field.data and field.data.filename:
try:
Image.open(field.data, formats=['JPEG'])
field.data.seek(0)
except UnidentifiedImageError as e:
raise ValidationError(
"Invalid JPEG file. It may be corrupted."
)
email = StringField(
'Email',
[InputRequired(), Email()]
)
first_name = StringField(
'First Name',
[InputRequired(), validate_name]
)
last_name = StringField(
'Last Name',
[InputRequired(), validate_name]
)
display_name = StringField(
'Nickname',
[InputRequired(), validate_name]
)
password = PasswordField(
'Password',
[
EqualTo('password_confirmation', message='Passwords must match'),
validate_password,
],
)
password_confirmation = PasswordField(
'Password (repeat)',
)
picture = FileField(
'Picture',
[FileAllowed(['jpg', 'jpeg'], 'JPEG images only.')]
)
submit = SubmitField(
'Update',
)
class UserCreationForm(UserUpdateForm):
@staticmethod
def validate_username(form, field) -> None:
try:
User.assert_is_valid_username(field.data)
except InvalidStringFormatException as e:
raise ValidationError(str(e))
new_user_dn = f"uid={field.data}," + current_app.config['LDAP_USERS_OU']
conn = ldap.get_connection()
if ldap.user_exists(conn, new_user_dn):
raise ValidationError("Username is taken.")
conn.unbind()
username = StringField(
'Username',
[
InputRequired('Please enter a username.'),
validate_username
]
)
display_name = StringField(
'Nickname',
[UserUpdateForm.validate_name]
)
password = PasswordField(
'Password',
[
EqualTo('password_confirmation', message='Passwords must match'),
InputRequired('Please enter a password.'),
UserUpdateForm.validate_password,
],
)
submit = SubmitField(
'Create',
)
@bp.route("/users/create", methods=("GET", "POST"))
@login_required
def user_create():
"""Creation view for a new User.
Provides a form which can be used to enter the new user's details.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
form = UserCreationForm()
if form.validate_on_submit():
user = User(
form.username.data,
User.generate_password_hash(form.password.data),
form.email.data,
form.first_name.data,
form.last_name.data,
form.display_name.data if form.display_name.data else None,
Image.open(form.picture.data, formats=['JPEG']) if form.picture.data and form.picture.data.filename else None,
)
ldap.create_user(conn, user)
user._generate_static_images(force=True)
conn.unbind()
flash(f"User '{user.username}' was created.")
return redirect(url_for('usermanager.user_view', username=user.username))
conn.unbind()
return render_template(
'usermanager/user_edit.html',
form=form,
heading=f"Create a new user",
is_update=False,
)
@bp.route("/users/update/<string:username>", methods=("GET", "POST"))
@login_required
def user_update(username: str):
"""Update view for a specific User.
Provides a form which can be used to edit that user's details.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
try:
user = ldap.get_user(conn, username)
except ldap.EntryNotFoundException:
conn.unbind()
abort(404)
form = UserUpdateForm(obj=user)
if form.validate_on_submit():
if form.email.data:
user.email = form.email.data
if form.first_name.data:
user.first_name = form.first_name.data
if form.last_name.data:
user.last_name = form.last_name.data
if form.display_name.data:
user.display_name = form.display_name.data
if form.password.data:
user.password_hash = User.generate_password_hash(form.password.data)
if form.picture.data and form.picture.data.filename:
user.picture = Image.open(form.picture.data, formats=['JPEG'])
user._generate_static_images(force=True)
ldap.update_user(conn, user)
conn.unbind()
flash(f"Information for user '{user.username}' was updated.")
return redirect(url_for('usermanager.user_view', username=user.username))
conn.unbind()
return render_template(
'usermanager/user_edit.html',
form=form,
username=user.username,
heading=f"Edit user: {user.username}",
is_update=True,
)
@bp.route("/users/delete/<string:username>", methods=("GET", "POST"))
@login_required
def user_delete(username: str):
"""Deletion view for a specific User.
Provides a form prompting the confirmation of the specified user.
If the user is the sole member of any groups, the groups which get deleted
implicitly by the lumi.ldap.delete_user() operation will be listed here as
well.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
try:
user = ldap.get_user(conn, username)
except ldap.EntryNotFoundException:
conn.unbind()
abort(404)
deleted_groups = set()
for group in ldap.get_groups_of_user(conn, user):
if len(group.members) == 1:
deleted_groups.add(group.groupname)
if request.method == 'POST':
ldap.delete_user(conn, user.username)
# FIXME delete user's static image folder!!!
# currently, the images are only purged on app restart
conn.unbind()
flash(f"The user '{user.username}' was deleted.")
for groupname in deleted_groups:
flash(f"The group '{groupname}' was deleted.")
return redirect(url_for('usermanager.user_list'))
return render_template(
'usermanager/user_delete.html',
username=user.username,
deleted_groups=deleted_groups,
)
@bp.route("/groups/list")
@login_required
def group_list():
"""Displays a list of all groups."""
try:
conn = ldap.get_connection()
groups = ldap.get_groups(conn)
conn.unbind()
except Exception:
abort(500)
return render_template(
'usermanager/group_list.html',
groups=groups,
)
@bp.route("/groups/create")
@login_required
def group_create():
"""Creation view for a new group.
Shows a table allowing adding members to this group.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
users = ldap.get_users(conn)
conn.unbind()
return render_template(
'usermanager/group_create.html',
users=users,
)
@bp.route("/groups/update/<string:groupname>")
@login_required
def group_update(groupname: str):
"""Detail and Update view for a group.
Shows a table allowing the modification of user memberships for this group.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
try:
group = ldap.get_group(conn, groupname)
except ldap.EntryNotFoundException:
conn.unbind()
abort(404)
members = {user for user in group.members}
non_members = {user for user in ldap.get_users(conn)} - members
conn.unbind()
return render_template(
'usermanager/group_edit.html',
groupname=group.groupname,
members=members,
non_members=non_members,
)
@bp.route("/groups/delete/<string:groupname>", methods=("GET", "POST"))
@login_required
def group_delete(groupname: str):
"""Deletion view for a specific Group.
Provides a form prompting the confirmation of the specified group.
"""
try:
conn = ldap.get_connection()
except Exception:
abort(500)
try:
group = ldap.get_group(conn, groupname)
except ldap.EntryNotFoundException:
conn.unbind()
abort(404)
if request.method == 'POST':
ldap.delete_group(conn, group.groupname)
conn.unbind()
flash(f"The group '{group.groupname}' was deleted.")
return redirect(url_for('usermanager.group_list'))
return render_template(
'usermanager/group_delete.html',
groupname=group.groupname,
)