import ldap as l from ldap3 import Server, Connection, ALL, MODIFY_REPLACE from flask import Flask, g, request, session, redirect, url_for, render_template from flask_simpleldap import LDAP from flask_bootstrap import Bootstrap from email_validator import validate_email, EmailNotValidError import os app = Flask(__name__) Bootstrap(app) app.secret_key = 'asdf' app.debug = True # Base app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN') app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME') app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD') # OpenLDAP app.config['LDAP_OBJECTS_DN'] = 'dn' app.config['LDAP_OPENLDAP'] = True app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' ldap = LDAP(app) server = Server(app.config['LDAP_HOST']) conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) @app.before_request def before_request(): g.user = None if 'user_id' in session: # This is where you'd query your database to get the user info. g.user = {} @app.route('/') @ldap.login_required def index(): user_dict = ldap.get_object_details(session['user_id']) if 'user_id' in session: user = {'dn': 'cn={},cn=usergroup,ou=users,dc=technicalincompetence,dc=club'.format(user_dict['cn'][0].decode('ascii')), 'firstName': user_dict['givenName'][0].decode('ascii'), 'lastName': user_dict['sn'][0].decode('ascii'), 'email': user_dict['mail'][0].decode('ascii'), 'userName': user_dict['uid'][0].decode('ascii'), } return render_template('profile.j2', user = user) @app.route('/login', methods=['GET', 'POST']) def login(): if g.user: return redirect(url_for('index')) if request.method == 'POST': user = request.form['user'] passwd = request.form['passwd'] test = ldap.bind_user(user, passwd) if test is None or passwd == '': return render_template('login.j2', error='Invalid credentials') else: session['user_id'] = request.form['user'] session['passwd'] = request.form['passwd'] return redirect('/') return render_template('login.j2') @ldap.login_required @app.route('/update/email', methods=['POST']) def update_email(): if request.method == 'POST': email = request.form['email'] dn = request.form['dn'] if email != None and len(email) > 0: try: # Validate. valid = validate_email(email) # Update with the normalized form. conn.modify(dn, {'mail': [(MODIFY_REPLACE, [valid.email])]}) return 'Success' except EmailNotValidError as e: # email is not valid, exception message is human-readable print(str(e)) return 'Invalid email address' return 'Email cannot be empty' @ldap.login_required @app.route('/update/name', methods=['POST']) def update_name(): if request.method == 'POST': firstName = request.form['firstName'] lastName = request.form['lastName'] dn = request.form['dn'] if (firstName != None and len(firstName) > 0) and (lastName != None and len(lastName) > 0): conn.modify(dn, {'givenName': [(MODIFY_REPLACE, [firstName])], 'sn': [(MODIFY_REPLACE, [lastName])]}) return 'Success' return 'Name cannot be empty' @ldap.login_required @app.route('/update/username', methods=['POST']) def update_username(): if request.method == 'POST': userName = request.form['userName'] dn = request.form['dn'] if userName != None and len(userName) > 0: conn.modify(dn, {'uid': [(MODIFY_REPLACE, [userName])]}) return 'Success' return 'Username cannot be empty' @ldap.login_required @app.route('/update/password', methods=['POST']) def update_password(): if request.method == 'POST': currentPassword = request.form['currentPassword'] newPassword = request.form['newPassword'] confirmPassword = request.form['confirmPassword'] dn = request.form['dn'] if currentPassword == '': return 'Please enter your current password' if newPassword == '': return 'Please enter a new password' if confirmPassword == '': return 'Please confirm your new password' if newPassword != confirmPassword: return 'Could not confirm new password, please make sure you typed it correctly' test = ldap.bind_user(session['user_id'], currentPassword) if test is None: return 'Current password is incorrect' else: conn.extend.standard.modify_password(user=dn, new_password=newPassword) return 'Success' return 'Error' @app.route('/logout') def logout(): session.pop('user_id', None) return redirect(url_for('index')) if __name__ == '__main__': app.run()