You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

408 lines
13 KiB

from flask.helpers import send_from_directory
import ldap as l
from ldap3 import Server, Connection
from ldap3.core.exceptions import LDAPBindError
from flask import Flask, g, request, redirect, url_for, render_template, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, login_manager, current_user, login_user, \
logout_user, login_required
from flask_wtf import FlaskForm
from flask_cache_buster import CacheBuster
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
from werkzeug.utils import secure_filename
from flask_bootstrap import Bootstrap
import short_url
import os
import sqlite3
app = Flask(__name__)
Bootstrap(app)
app.secret_key = 'asdf'
app.debug = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['WTF_CSRF_SECRET_KEY'] = 'asdf'
# Base
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication'
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST')
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN')
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME')
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD')
# Uploads
app.config['UPLOAD_FOLDER'] = 'links/images'
# OpenLDAP
app.config['LDAP_OBJECTS_DN'] = 'dn'
app.config['LDAP_OPENLDAP'] = True
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))'
# Login cookies
#app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
#app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
short_domain = os.environ.get('SHORT_DOMAIN')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.init_app(app)
login_manager.login_view = 'login'
db.create_all()
config = {
'extensions': ['.js', '.css', '.csv'],
'hash_size': 10
}
cache_buster = CacheBuster(config=config)
cache_buster.register_cache_buster(app)
server = Server(app.config['LDAP_HOST'])
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True)
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100))
authenticated = db.Column(db.Boolean, default=False)
def __init__(self, username):
self.username = username
@staticmethod
def try_login(username, password):
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*'])
if len(conn.entries) > 0:
Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True)
return
raise LDAPBindError
def is_authenticated(self):
return self.authenticated
def is_active(self):
return True
def is_anonymous(self):
return False
def get_id(self):
return self.id
def get_user_dict(self):
user = {'dn': '',
'firstName': '',
'lastName': '',
'email': '',
'userName': self.username,
}
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*'])
user['dn'] = conn.entries[0].entry_dn
user['firstName'] = conn.entries[0].givenName.value
user['lastName'] = conn.entries[0].sn.value
user['email'] = conn.entries[0].mail.value
return user
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Sign In')
@login_manager.user_loader
def load_user(id):
return User.query.get(int(id))
@app.before_request
def get_current_user():
g.user = current_user
@app.route('/')
@login_required
def index():
pastes = get_pastes_for_user()
links = get_links_for_user()
images = get_images_for_user()
return render_template('profile.j2', user = current_user.get_user_dict(), short_domain = short_domain, links = links, pastes = pastes, images = images)
@app.route('/link')
@login_required
def link():
return render_template('link.j2', user = current_user.get_user_dict(), short_domain = short_domain)
@app.route('/paste')
@login_required
def text():
return render_template('paste.j2', user = current_user.get_user_dict())
@app.route('/image', methods=['GET', 'POST'])
@login_required
def image():
if request.method == 'POST':
print(request.files)
# check if the post request has the file part
if 'file' not in request.files:
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No file part.")
file = request.files['file']
# if user does not select file, browser also
# submit an empty part without filename
if file.filename == '':
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No selected file.")
if file and allowed_file(file.filename):
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
c.execute("SELECT * FROM images WHERE filename=? AND user_id=?", (filename, current_user.get_user_dict()['dn']))
row = c.fetchone()
if row == None:
c.execute("INSERT INTO images (filename, user_id) VALUES (?, ?)", (filename, current_user.get_user_dict()['dn']))
c.execute("SELECT * FROM images WHERE filename=? AND user_id=?", (filename, current_user.get_user_dict()['dn']))
row = c.fetchone()
print(row[0])
conn.commit()
conn.close()
url_fragment = short_url.encode_url(row[0])
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, success_msg = "Your image link is <a target='_blank' href='{}/i/{}'>{}/i/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment))
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain)
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
flash('You are already logged in.')
return redirect(url_for('index'))
form = LoginForm(request.form)
print(form)
print(request.method)
if request.method == 'POST' and form.validate():
username = request.form.get('username')
password = request.form.get('password')
print(username)
print(password)
try:
User.try_login(username, password)
except LDAPBindError:
flash(
'Invalid username or password. Please try again.',
'danger')
return render_template('login.j2', form=form)
user = User.query.filter(User.username == username).first()
print(user)
if user is None:
user = User(username, password)
db.session.add(user)
user.authenticated = True
db.session.commit()
login_user(user, remember=form.remember_me.data)
print('You have successfully logged in.')
return redirect(url_for('index'))
if form.errors:
flash(form.errors, 'danger')
return render_template('login.j2', form=form)
@app.route('/shorten', methods=['POST'])
@login_required
def shorten_url():
if request.method == 'POST':
url = request.form['url']
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
if url is not None and len(url) > 0:
c.execute("SELECT * FROM links WHERE url=? AND user_id=?", (url, current_user.get_user_dict()['dn']))
row = c.fetchone()
if row == None:
c.execute("INSERT INTO links (url, user_id) VALUES (?, ?)", (url, current_user.get_user_dict()['dn']))
c.execute("SELECT * FROM links WHERE url=? AND user_id=?", (url, current_user.get_user_dict()['dn']))
row = c.fetchone()
print(row[0])
conn.commit()
conn.close()
url_fragment = short_url.encode_url(row[0])
return "Your shortened link is <a target='_blank' href='{}/l/{}'>{}/l/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment)
conn.commit()
conn.close()
return 'Error'
@app.route('/save', methods=['POST'])
@login_required
def save_paste():
if request.method == 'POST':
paste = request.data
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
if paste is not None and len(paste) > 0:
c.execute("SELECT * FROM pastes WHERE paste=? AND user_id=?", (paste, current_user.get_user_dict()['dn']))
row = c.fetchone()
if row == None:
c.execute("INSERT INTO pastes (paste, user_id) VALUES (?, ?)", (paste, current_user.get_user_dict()['dn']))
c.execute("SELECT * FROM pastes WHERE paste=? AND user_id=?", (paste, current_user.get_user_dict()['dn']))
row = c.fetchone()
print(row[0])
conn.commit()
conn.close()
url_fragment = short_url.encode_url(row[0])
return {"success": True, "msg": "Your paste link is <a target='_blank' href='{}/p/{}'>{}/p/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment)}
conn.commit()
conn.close()
return {'success': False}
@app.route('/delete', methods=['POST'])
@login_required
def delete():
if request.method == 'POST':
data = request.json
table = data['table']
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
if table == 'links':
c.execute("DELETE FROM links WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
elif table == 'pastes':
c.execute("DELETE FROM pastes WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
elif table == 'images':
c.execute("DELETE FROM images WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
else:
return {'success': False, 'msg': 'This table doesn\'t exist!'}
conn.commit()
conn.close()
return {'success': True, 'msg': 'Deleted successfully!'}
return {'success': False, 'msg': 'An error occurred.'}
@app.route('/l/<url>')
def expand_url(url):
idx = short_url.decode_url(url)
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM links WHERE id=?", (idx,))
out = c.fetchone()
if out != None:
out_link = out[1]
return redirect(out_link)
return render_template('404.j2')
@app.route('/p/<url>')
def show_paste(url):
idx = short_url.decode_url(url)
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM pastes WHERE id=?", (idx,))
out = c.fetchone()
if out != None:
out_paste = str(out[1], 'utf-8')
return render_template('public_paste.j2', paste = out_paste)
return render_template('404.j2')
@app.route('/i/<url>')
def show_image(url):
idx = short_url.decode_url(url)
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM images WHERE id=?", (idx,))
out = c.fetchone()
if out != None:
filename = out[1]
return send_from_directory(app.config['UPLOAD_FOLDER'], filename=filename, as_attachment=False)
return render_template('404.j2')
def get_pastes_for_user():
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM pastes WHERE user_id=?", (current_user.get_user_dict()["dn"],))
out = []
for row in c.fetchall():
a = "{}/p/{}<span class='faded'> - {}</span>".format(short_domain, short_url.encode_url(row[0]), str(row[1], 'utf-8')[:80])
b = "{}/p/{}".format(short_domain, short_url.encode_url(row[0]))
out.append((row[0], a, b))
return out
def get_links_for_user():
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM links WHERE user_id=?", (current_user.get_user_dict()["dn"],))
out = []
for row in c.fetchall():
a = "{}/l/{}<span class='faded'> - {}</span>".format(short_domain, short_url.encode_url(row[0]), row[1])
b = "{}/l/{}".format(short_domain, short_url.encode_url(row[0]))
out.append((row[0], a, b))
return out
def get_images_for_user():
conn = sqlite3.connect('links/links.db')
c = conn.cursor()
c.execute("SELECT * FROM images WHERE user_id=?", (current_user.get_user_dict()["dn"],))
out = []
for row in c.fetchall():
a = "{}/i/{}".format(short_domain, short_url.encode_url(row[0]))
out.append((row[0], a, a))
return out
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/logout')
@login_required
def logout():
user = current_user
user.authenticated = False
db.session.add(user)
db.session.commit()
logout_user()
return redirect(url_for('game'))
if __name__ == '__main__':
app.run()