|
|
- import ldap as l
- from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
- from ldap3.core.exceptions import LDAPBindError
- from flask import Flask, g, request, session, redirect, url_for, render_template, flash
- from flask_bs4 import Bootstrap
- from flask_sqlalchemy import SQLAlchemy
- from flask_login import LoginManager, login_manager, current_user, login_user, \
- logout_user, login_required
- from flask_wtf import FlaskForm
- from flask_cache_buster import CacheBuster
- from wtforms import StringField, PasswordField, BooleanField, SubmitField
- from wtforms.validators import DataRequired
- import yaml
- import datetime as dt
- import pytz
- import os
- import sqlite3
-
- app = Flask(__name__)
- Bootstrap(app)
- app.secret_key = 'asdf'
- app.debug = True
-
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- app.config['WTF_CSRF_SECRET_KEY'] = 'asdf'
-
- # Base
- app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication'
- app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST')
- app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN')
- app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME')
- app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD')
-
- # OpenLDAP
- app.config['LDAP_OBJECTS_DN'] = 'dn'
- app.config['LDAP_OPENLDAP'] = True
- app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))'
-
- # Login cookies
- app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
- app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
-
- db = SQLAlchemy(app)
-
- login_manager = LoginManager(app)
- login_manager.init_app(app)
- login_manager.login_view = 'login'
-
- db.create_all()
-
- config = {
- 'extensions': ['.js', '.css', '.csv'],
- 'hash_size': 10
- }
-
- cache_buster = CacheBuster(config=config)
- cache_buster.register_cache_buster(app)
-
-
- eastern = pytz.timezone('US/Eastern')
-
- with open('config/config.yaml') as f:
- yaml_data = yaml.load(f, Loader=yaml.SafeLoader)
-
- search = yaml_data['search']
- account_url = yaml_data['accounts']['account_url']
-
- description = yaml_data['description']
- game_description = yaml_data['game_description']
-
- countdown_data = None
- if yaml_data['countdown']['active'] == True:
- countdown_data = yaml_data['countdown']
-
- final_countdown_data = None
- final_time = None
- if yaml_data['final_countdown']['active'] == True:
- final_countdown_data = yaml_data['final_countdown']
- final_time = eastern.localize(dt.datetime.strptime(final_countdown_data['timestamp'], '%B %d %Y %H:%M:%S%z').replace(tzinfo=None))
-
- apps = []
- for itm in yaml_data['apps'].items():
- apps.append(itm[1])
-
-
- games = []
- for itm in yaml_data['games'].items():
- games.append(itm[1])
-
-
- server = Server(app.config['LDAP_HOST'])
- conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True)
-
-
- class User(db.Model):
-
- __tablename__ = 'user'
-
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(100))
- password = db.Column(db.String(128))
- authenticated = db.Column(db.Boolean, default=False)
-
- def __init__(self, username, password):
- self.username = username
- self.password = password
-
- @staticmethod
- def try_login(username, password):
- conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*'])
- if len(conn.entries) > 0:
- Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True)
- return
- raise LDAPBindError
-
- def is_authenticated(self):
- return self.authenticated
-
- def is_active(self):
- return True
-
- def is_anonymous(self):
- return False
-
- def get_id(self):
- return self.id
-
- def get_user_dict(self):
- user = {'dn': '',
- 'firstName': '',
- 'lastName': '',
- 'email': '',
- 'userName': self.username,
- }
-
- conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*'])
-
- user['dn'] = conn.entries[0].entry_dn
- user['firstName'] = conn.entries[0].givenName.value
- user['lastName'] = conn.entries[0].sn.value
- user['email'] = conn.entries[0].mail.value
-
- return user
-
-
- class LoginForm(FlaskForm):
- username = StringField('Username', validators=[DataRequired()])
- password = PasswordField('Password', validators=[DataRequired()])
- remember_me = BooleanField('Remember Me')
- submit = SubmitField('Sign In')
-
-
- @login_manager.user_loader
- def load_user(id):
- return User.query.get(int(id))
-
-
- @app.before_request
- def get_current_user():
- g.user = current_user
-
-
- @app.route('/frame')
- def frame():
- return render_template('frame.j2', apps = apps, games = games)
-
-
- @app.route('/')
- def index():
- current_time = eastern.localize(dt.datetime.now())
- if final_countdown_data != None:
- if (final_time - current_time).days > -1:
- return render_template('final_countdown.j2', final_countdown = final_countdown_data)
- if countdown_data != None:
- return render_template('index.j2', apps = apps, search = search, account_url = account_url, description = description, countdown = countdown_data)
- return render_template('index.j2', apps = apps, search = search, account_url = account_url, description = description)
-
-
- @app.route('/games')
- def game():
- current_time = eastern.localize(dt.datetime.now())
- if final_countdown_data != None:
- if (final_time - current_time).days > -1:
- return render_template('final_countdown.j2', final_countdown = final_countdown_data)
- if countdown_data != None:
- if current_user.is_authenticated:
- return render_template('games.j2', games = games, search = search, account_url = account_url, description = game_description, countdown = countdown_data, game_list = generate_game_list(current_user.username))
- return render_template('games.j2', games = games, search = search, account_url = account_url, description = game_description, countdown = countdown_data)
- if current_user.is_authenticated:
- return render_template('games.j2', games = games, search = search, account_url = account_url, description = game_description, game_list = generate_game_list(current_user.username))
- return render_template('games.j2', games = games, search = search, account_url = account_url, description = game_description)
-
-
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if current_user.is_authenticated:
- flash('You are already logged in.')
- return redirect(url_for('index'))
-
- form = LoginForm(request.form)
- print(form)
- print(request.method)
-
- if request.method == 'POST' and form.validate():
- username = request.form.get('username')
- password = request.form.get('password')
- print(username)
- print(password)
-
- try:
- User.try_login(username, password)
- except LDAPBindError:
- flash(
- 'Invalid username or password. Please try again.',
- 'danger')
- return render_template('login.j2', form=form)
-
- user = User.query.filter(User.username == username).first()
-
- print(user)
- if user is None:
- user = User(username, password)
- db.session.add(user)
- user.authenticated = True
- db.session.commit()
- login_user(user, remember=form.remember_me.data)
-
- print('You have successfully logged in.')
- return redirect('/games')
-
- if form.errors:
- flash(form.errors, 'danger')
-
- return render_template('login.j2', form=form)
-
-
- @app.route('/add', methods=['POST'])
- @login_required
- def add_game():
- if request.method == 'POST':
- game_title = request.form['game_title']
- game_link = request.form['game_link']
- games_conn = sqlite3.connect('config/games_in_progress.db')
- c = games_conn.cursor()
-
- if game_title is not None and len(game_title) > 0 and game_link is not None and len(game_link) > 0:
- c.execute("INSERT INTO games (user_id, game_title, game_link) VALUES (?, ?, ?)", (current_user.username, game_title, game_link,))
- games_conn.commit()
- games_conn.close()
-
- return 'Success'
- games_conn.commit()
- games_conn.close()
- return 'Error'
-
-
- @app.route('/delete', methods=['POST'])
- @login_required
- def delete_game():
- if request.method == 'POST':
- game_id = request.form['game_id']
- games_conn = sqlite3.connect('config/games_in_progress.db')
- c = games_conn.cursor()
-
- if game_id is not None and len(game_id) > 0:
- c.execute("DELETE FROM games WHERE id=? AND user_id=?", (game_id, current_user.username,))
- games_conn.commit()
- games_conn.close()
-
- return 'Success'
- games_conn.commit()
- games_conn.close()
- return 'Error'
-
-
- def generate_game_list(username):
- games_conn = sqlite3.connect('config/games_in_progress.db')
- c = games_conn.cursor()
-
- c.execute('SELECT * FROM games WHERE user_id=?', (username, ))
- rows = c.fetchall()
- games_conn.close()
-
- return rows
-
-
- @app.route('/logout')
- @login_required
- def logout():
- user = current_user
- user.authenticated = False
- db.session.add(user)
- db.session.commit()
- logout_user()
- return redirect(url_for('game'))
-
-
- if __name__ == '__main__':
- app.run(extra_files="config/config.yaml")
|