from flask import request, render_template, flash, redirect, \
|
|
url_for, Blueprint, g
|
|
from flask_login import current_user, login_user, \
|
|
logout_user, login_required
|
|
from ldap3 import MODIFY_REPLACE
|
|
from ldap3.core.exceptions import LDAPBindError
|
|
from accounts import login_manager, db, ldap
|
|
from accounts.auth.models import User, LoginForm, get_ldap_connection
|
|
from email_validator import validate_email, EmailNotValidError
|
|
|
|
|
|
auth = Blueprint('auth', __name__)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(id):
|
|
return User.query.get(int(id))
|
|
|
|
|
|
@auth.before_request
|
|
def get_current_user():
|
|
g.user = current_user
|
|
|
|
|
|
@auth.route('/')
|
|
@login_required
|
|
def home():
|
|
return render_template('profile.j2', user = current_user.get_user_dict())
|
|
|
|
|
|
@auth.route('/update/email', methods=['POST'])
|
|
@login_required
|
|
def update_email():
|
|
if request.method == 'POST':
|
|
email = request.form['email']
|
|
dn = request.form['dn']
|
|
|
|
if email != None and len(email) > 0:
|
|
try:
|
|
# Validate.
|
|
valid = validate_email(email)
|
|
|
|
# Update with the normalized form.
|
|
conn = get_ldap_connection()
|
|
conn.modify(dn, {'mail': [(MODIFY_REPLACE, [valid.email])]})
|
|
return 'Success'
|
|
except EmailNotValidError as e:
|
|
# email is not valid, exception message is human-readable
|
|
print(str(e))
|
|
return 'Invalid email address'
|
|
return 'Email cannot be empty'
|
|
|
|
|
|
@auth.route('/update/name', methods=['POST'])
|
|
@login_required
|
|
def update_name():
|
|
if request.method == 'POST':
|
|
firstName = request.form['firstName']
|
|
lastName = request.form['lastName']
|
|
dn = request.form['dn']
|
|
|
|
if (firstName != None and len(firstName) > 0) and (lastName != None and len(lastName) > 0):
|
|
conn = get_ldap_connection()
|
|
conn.modify(dn, {'givenName': [(MODIFY_REPLACE, [firstName])],
|
|
'sn': [(MODIFY_REPLACE, [lastName])]})
|
|
return 'Success'
|
|
return 'Name cannot be empty'
|
|
|
|
|
|
@auth.route('/update/username', methods=['POST'])
|
|
@login_required
|
|
def update_username():
|
|
if request.method == 'POST':
|
|
userName = request.form['userName']
|
|
dn = request.form['dn']
|
|
|
|
if userName != None and len(userName) > 0:
|
|
conn = get_ldap_connection()
|
|
conn.modify(dn, {'uid': [(MODIFY_REPLACE, [userName])]})
|
|
return 'Success'
|
|
return 'Username cannot be empty'
|
|
|
|
|
|
@auth.route('/update/password', methods=['POST'])
|
|
@login_required
|
|
def update_password():
|
|
if request.method == 'POST':
|
|
currentPassword = request.form['currentPassword']
|
|
newPassword = request.form['newPassword']
|
|
confirmPassword = request.form['confirmPassword']
|
|
dn = request.form['dn']
|
|
|
|
if currentPassword == '':
|
|
return 'Please enter your current password'
|
|
|
|
if newPassword == '':
|
|
return 'Please enter a new password'
|
|
|
|
if confirmPassword == '':
|
|
return 'Please confirm your new password'
|
|
|
|
if newPassword != confirmPassword:
|
|
return 'Could not confirm new password, please make sure you typed it correctly'
|
|
|
|
try:
|
|
User.try_login(current_user.username, currentPassword)
|
|
except LDAPBindError:
|
|
return 'Current password is incorrect'
|
|
|
|
conn = get_ldap_connection()
|
|
conn.extend.standard.modify_password(user=dn, new_password=newPassword)
|
|
return 'Success'
|
|
return 'Error'
|
|
|
|
|
|
@auth.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
flash('You are already logged in.')
|
|
return redirect(url_for('auth.home'))
|
|
|
|
form = LoginForm(request.form)
|
|
|
|
if request.method == 'POST' and form.validate():
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
|
|
try:
|
|
User.try_login(username, password)
|
|
except LDAPBindError:
|
|
flash(
|
|
'Invalid username or password. Please try again.',
|
|
'danger')
|
|
return render_template('login.j2', form=form)
|
|
|
|
user = User.query.filter(User.username == username).first()
|
|
|
|
print(user)
|
|
if user is None:
|
|
user = User(username)
|
|
db.session.add(user)
|
|
user.authenticated = True
|
|
db.session.commit()
|
|
login_user(user, remember=form.remember_me.data)
|
|
|
|
print('You have successfully logged in.')
|
|
return redirect(url_for('auth.home'))
|
|
|
|
if form.errors:
|
|
flash(form.errors, 'danger')
|
|
|
|
return render_template('login.j2', form=form)
|
|
|
|
|
|
@auth.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
user = current_user
|
|
user.authenticated = False
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
logout_user()
|
|
return redirect(url_for('auth.home'))
|