You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

288 lines
8.7 KiB

import ldap as l
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
from ldap3.core.exceptions import LDAPBindError
from flask import Flask, g, request, session, redirect, url_for, render_template, flash
from flask_bs4 import Bootstrap
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, login_manager, current_user, login_user, \
logout_user, login_required
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
import yaml
import datetime as dt
import pytz
import os
import sqlite3
app = Flask(__name__)
Bootstrap(app)
app.secret_key = 'asdf'
app.debug = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['WTF_CSRF_SECRET_KEY'] = 'asdf'
# Base
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication'
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST')
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN')
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME')
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD')
# OpenLDAP
app.config['LDAP_OBJECTS_DN'] = 'dn'
app.config['LDAP_OPENLDAP'] = True
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))'
# Login cookies
app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.init_app(app)
login_manager.login_view = 'login'
db.create_all()
eastern = pytz.timezone('US/Eastern')
with open('config/config.yaml') as f:
yaml_data = yaml.load(f, Loader=yaml.SafeLoader)
search = yaml_data['search']
account_url = yaml_data['accounts']['account_url']
description = yaml_data['description']
game_description = yaml_data['game_description']
countdown_data = None
if yaml_data['countdown']['active'] == True:
countdown_data = yaml_data['countdown']
final_countdown_data = None
final_time = None
if yaml_data['final_countdown']['active'] == True:
final_countdown_data = yaml_data['final_countdown']
final_time = eastern.localize(dt.datetime.strptime(final_countdown_data['timestamp'], '%B %d %Y %H:%M:%S%z').replace(tzinfo=None))
apps = []
for itm in yaml_data['apps'].items():
apps.append(itm[1])
games = []
for itm in yaml_data['games'].items():
games.append(itm[1])
server = Server(app.config['LDAP_HOST'])
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True)
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100))
password = db.Column(db.String(128))
authenticated = db.Column(db.Boolean, default=False)
def __init__(self, username, password):
self.username = username
self.password = password
@staticmethod
def try_login(username, password):
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*'])
if len(conn.entries) > 0:
Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True)
return
raise LDAPBindError
def is_authenticated(self):
return self.authenticated
def is_active(self):
return True
def is_anonymous(self):
return False
def get_id(self):
return self.id
def get_user_dict(self):
user = {'dn': '',
'firstName': '',
'lastName': '',
'email': '',
'userName': self.username,
}
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*'])
user['dn'] = conn.entries[0].entry_dn
user['firstName'] = conn.entries[0].givenName.value
user['lastName'] = conn.entries[0].sn.value
user['email'] = conn.entries[0].mail.value
return user
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Sign In')
@login_manager.user_loader
def load_user(id):
return User.query.get(int(id))
@app.before_request
def get_current_user():
g.user = current_user
@app.route('/')
def index():
current_time = eastern.localize(dt.datetime.now())
if final_countdown_data != None:
if (final_time - current_time).days > -1:
return render_template('final_countdown.j2', final_countdown = final_countdown_data)
if countdown_data != None:
return render_template('index.j2', apps = apps, search = search, account_url = account_url, description = description, countdown = countdown_data)
return render_template('index.j2', apps = apps, search = search, account_url = account_url, description = description)
@app.route('/games')
def game():
current_time = eastern.localize(dt.datetime.now())
if final_countdown_data != None:
if (final_time - current_time).days > -1:
return render_template('final_countdown.j2', final_countdown = final_countdown_data)
if countdown_data != None:
return render_template('games.j2', apps = games, search = search, account_url = account_url, description = game_description, countdown = countdown_data)
if current_user.is_authenticated:
return render_template('games.j2', apps = games, search = search, account_url = account_url, description = game_description, user = current_user.get_user_dict(), game_list = generate_game_list())
return render_template('games.j2', apps = games, search = search, account_url = account_url, description = game_description)
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
flash('You are already logged in.')
return redirect(url_for('auth.home'))
form = LoginForm(request.form)
print(form)
print(request.method)
if request.method == 'POST' and form.validate():
username = request.form.get('username')
password = request.form.get('password')
print(username)
print(password)
try:
User.try_login(username, password)
except LDAPBindError:
flash(
'Invalid username or password. Please try again.',
'danger')
return render_template('login.j2', form=form)
user = User.query.filter(User.username == username).first()
print(user)
if user is None:
user = User(username, password)
db.session.add(user)
user.authenticated = True
db.session.commit()
login_user(user, remember=form.remember_me.data)
print('You have successfully logged in.')
return redirect('/games')
if form.errors:
flash(form.errors, 'danger')
return render_template('login.j2', form=form)
@app.route('/add', methods=['POST'])
@login_required
def add_game():
if request.method == 'POST':
game_title = request.form['game_title']
game_link = request.form['game_link']
conn = sqlite3.connect('config/games_in_progress.db')
c = conn.cursor()
if game_title is not None and len(game_title) > 0 and game_link is not None and len(game_link) > 0:
c.execute("INSERT INTO games (user_id, game_title, game_link) VALUES (?, ?, ?)", (current_user.username, game_title, game_link,))
conn.commit()
conn.close()
return 'Success'
conn.commit()
conn.close()
return 'Error'
@app.route('/delete', methods=['POST'])
@login_required
def delete_game():
if request.method == 'POST':
game_id = request.form['game_id']
conn = sqlite3.connect('config/games_in_progress.db')
c = conn.cursor()
if game_id is not None and len(game_id) > 0:
c.execute("DELETE FROM games WHERE id=? AND user_id=?", (game_id, session['user_id'],))
conn.commit()
conn.close()
return 'Success'
conn.commit()
conn.close()
return 'Error'
def generate_game_list():
conn = sqlite3.connect('config/games_in_progress.db')
c = conn.cursor()
if current_user.is_authenticated:
c.execute('SELECT * FROM games WHERE user_id=?', (current_user.username, ))
rows = c.fetchall()
conn.close()
return rows
conn.close()
return []
@app.route('/logout')
@login_required
def logout():
user = current_user
user.authenticated = False
db.session.add(user)
db.session.commit()
logout_user()
return redirect(url_for('game'))
if __name__ == '__main__':
app.run(extra_files="config/config.yaml")