from flask.helpers import send_from_directory
|
|
import ldap as l
|
|
from ldap3 import Server, Connection
|
|
from ldap3.core.exceptions import LDAPBindError
|
|
from flask import Flask, g, request, redirect, url_for, render_template, flash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import LoginManager, login_manager, current_user, login_user, \
|
|
logout_user, login_required
|
|
from flask_wtf import FlaskForm
|
|
from flask_cache_buster import CacheBuster
|
|
from wtforms import StringField, PasswordField, BooleanField, SubmitField
|
|
from wtforms.validators import DataRequired
|
|
from werkzeug.utils import secure_filename
|
|
from flask_bootstrap import Bootstrap
|
|
import short_url
|
|
import os
|
|
import sqlite3
|
|
|
|
app = Flask(__name__)
|
|
Bootstrap(app)
|
|
app.secret_key = 'asdf'
|
|
app.debug = True
|
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['WTF_CSRF_SECRET_KEY'] = 'asdf'
|
|
|
|
# Base
|
|
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication'
|
|
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST')
|
|
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN')
|
|
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME')
|
|
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD')
|
|
|
|
# Uploads
|
|
app.config['UPLOAD_FOLDER'] = 'links/images'
|
|
|
|
# OpenLDAP
|
|
app.config['LDAP_OBJECTS_DN'] = 'dn'
|
|
app.config['LDAP_OPENLDAP'] = True
|
|
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))'
|
|
|
|
# Login cookies
|
|
#app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
|
|
#app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN')
|
|
|
|
short_domain = os.environ.get('SHORT_DOMAIN')
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
login_manager = LoginManager(app)
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
db.create_all()
|
|
|
|
config = {
|
|
'extensions': ['.js', '.css', '.csv'],
|
|
'hash_size': 10
|
|
}
|
|
|
|
cache_buster = CacheBuster(config=config)
|
|
cache_buster.register_cache_buster(app)
|
|
|
|
|
|
server = Server(app.config['LDAP_HOST'])
|
|
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True)
|
|
|
|
class User(db.Model):
|
|
|
|
__tablename__ = 'user'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(100))
|
|
authenticated = db.Column(db.Boolean, default=False)
|
|
|
|
def __init__(self, username):
|
|
self.username = username
|
|
|
|
@staticmethod
|
|
def try_login(username, password):
|
|
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*'])
|
|
if len(conn.entries) > 0:
|
|
Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True)
|
|
return
|
|
raise LDAPBindError
|
|
|
|
def is_authenticated(self):
|
|
return self.authenticated
|
|
|
|
def is_active(self):
|
|
return True
|
|
|
|
def is_anonymous(self):
|
|
return False
|
|
|
|
def get_id(self):
|
|
return self.id
|
|
|
|
def get_user_dict(self):
|
|
user = {'dn': '',
|
|
'firstName': '',
|
|
'lastName': '',
|
|
'email': '',
|
|
'userName': self.username,
|
|
}
|
|
|
|
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*'])
|
|
|
|
user['dn'] = conn.entries[0].entry_dn
|
|
user['firstName'] = conn.entries[0].givenName.value
|
|
user['lastName'] = conn.entries[0].sn.value
|
|
user['email'] = conn.entries[0].mail.value
|
|
|
|
return user
|
|
|
|
|
|
class LoginForm(FlaskForm):
|
|
username = StringField('Username', validators=[DataRequired()])
|
|
password = PasswordField('Password', validators=[DataRequired()])
|
|
remember_me = BooleanField('Remember Me')
|
|
submit = SubmitField('Sign In')
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(id):
|
|
return User.query.get(int(id))
|
|
|
|
|
|
@app.before_request
|
|
def get_current_user():
|
|
g.user = current_user
|
|
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
pastes = get_pastes_for_user()
|
|
links = get_links_for_user()
|
|
images = get_images_for_user()
|
|
return render_template('profile.j2', user = current_user.get_user_dict(), short_domain = short_domain, links = links, pastes = pastes, images = images)
|
|
|
|
|
|
@app.route('/link')
|
|
@login_required
|
|
def link():
|
|
return render_template('link.j2', user = current_user.get_user_dict(), short_domain = short_domain)
|
|
|
|
|
|
@app.route('/paste')
|
|
@login_required
|
|
def text():
|
|
return render_template('paste.j2', user = current_user.get_user_dict())
|
|
|
|
|
|
@app.route('/image', methods=['GET', 'POST'])
|
|
@login_required
|
|
def image():
|
|
if request.method == 'POST':
|
|
print(request.files)
|
|
# check if the post request has the file part
|
|
if 'file' not in request.files:
|
|
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No file part.")
|
|
file = request.files['file']
|
|
# if user does not select file, browser also
|
|
# submit an empty part without filename
|
|
if file.filename == '':
|
|
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No selected file.")
|
|
if file and allowed_file(file.filename):
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
filename = secure_filename(file.filename)
|
|
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
c.execute("SELECT * FROM images WHERE filename=? AND user_id=?", (filename, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
|
|
if row == None:
|
|
c.execute("INSERT INTO images (filename, user_id) VALUES (?, ?)", (filename, current_user.get_user_dict()['dn']))
|
|
|
|
c.execute("SELECT * FROM images WHERE filename=? AND user_id=?", (filename, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
print(row[0])
|
|
conn.commit()
|
|
conn.close()
|
|
url_fragment = short_url.encode_url(row[0])
|
|
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, success_msg = "Your image link is <a target='_blank' href='{}/i/{}'>{}/i/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment))
|
|
return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain)
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
flash('You are already logged in.')
|
|
return redirect(url_for('index'))
|
|
|
|
form = LoginForm(request.form)
|
|
print(form)
|
|
print(request.method)
|
|
|
|
if request.method == 'POST' and form.validate():
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
print(username)
|
|
print(password)
|
|
|
|
try:
|
|
User.try_login(username, password)
|
|
except LDAPBindError:
|
|
flash(
|
|
'Invalid username or password. Please try again.',
|
|
'danger')
|
|
return render_template('login.j2', form=form)
|
|
|
|
user = User.query.filter(User.username == username).first()
|
|
|
|
print(user)
|
|
if user is None:
|
|
user = User(username, password)
|
|
db.session.add(user)
|
|
user.authenticated = True
|
|
db.session.commit()
|
|
login_user(user, remember=form.remember_me.data)
|
|
|
|
print('You have successfully logged in.')
|
|
return redirect(url_for('index'))
|
|
|
|
if form.errors:
|
|
flash(form.errors, 'danger')
|
|
|
|
return render_template('login.j2', form=form)
|
|
|
|
|
|
@app.route('/shorten', methods=['POST'])
|
|
@login_required
|
|
def shorten_url():
|
|
if request.method == 'POST':
|
|
url = request.form['url']
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
|
|
if url is not None and len(url) > 0:
|
|
c.execute("SELECT * FROM links WHERE url=? AND user_id=?", (url, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
|
|
if row == None:
|
|
c.execute("INSERT INTO links (url, user_id) VALUES (?, ?)", (url, current_user.get_user_dict()['dn']))
|
|
c.execute("SELECT * FROM links WHERE url=? AND user_id=?", (url, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
print(row[0])
|
|
conn.commit()
|
|
conn.close()
|
|
url_fragment = short_url.encode_url(row[0])
|
|
return "Your shortened link is <a target='_blank' href='{}/l/{}'>{}/l/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment)
|
|
conn.commit()
|
|
conn.close()
|
|
return 'Error'
|
|
|
|
|
|
@app.route('/save', methods=['POST'])
|
|
@login_required
|
|
def save_paste():
|
|
if request.method == 'POST':
|
|
paste = request.data
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
|
|
if paste is not None and len(paste) > 0:
|
|
c.execute("SELECT * FROM pastes WHERE paste=? AND user_id=?", (paste, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
|
|
if row == None:
|
|
c.execute("INSERT INTO pastes (paste, user_id) VALUES (?, ?)", (paste, current_user.get_user_dict()['dn']))
|
|
|
|
c.execute("SELECT * FROM pastes WHERE paste=? AND user_id=?", (paste, current_user.get_user_dict()['dn']))
|
|
row = c.fetchone()
|
|
print(row[0])
|
|
conn.commit()
|
|
conn.close()
|
|
url_fragment = short_url.encode_url(row[0])
|
|
return {"success": True, "msg": "Your paste link is <a target='_blank' href='{}/p/{}'>{}/p/{}</a>".format(short_domain, url_fragment, short_domain, url_fragment)}
|
|
conn.commit()
|
|
conn.close()
|
|
return {'success': False}
|
|
|
|
|
|
@app.route('/delete', methods=['POST'])
|
|
@login_required
|
|
def delete():
|
|
if request.method == 'POST':
|
|
data = request.json
|
|
table = data['table']
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
if table == 'links':
|
|
c.execute("DELETE FROM links WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
|
|
elif table == 'pastes':
|
|
c.execute("DELETE FROM pastes WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
|
|
elif table == 'images':
|
|
c.execute("DELETE FROM images WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn']))
|
|
else:
|
|
return {'success': False, 'msg': 'This table doesn\'t exist!'}
|
|
conn.commit()
|
|
conn.close()
|
|
return {'success': True, 'msg': 'Deleted successfully!'}
|
|
return {'success': False, 'msg': 'An error occurred.'}
|
|
|
|
|
|
@app.route('/l/<url>')
|
|
def expand_url(url):
|
|
idx = short_url.decode_url(url)
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM links WHERE id=?", (idx,))
|
|
out = c.fetchone()
|
|
|
|
if out != None:
|
|
out_link = out[1]
|
|
return redirect(out_link)
|
|
return render_template('404.j2')
|
|
|
|
|
|
@app.route('/p/<url>')
|
|
def show_paste(url):
|
|
idx = short_url.decode_url(url)
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM pastes WHERE id=?", (idx,))
|
|
out = c.fetchone()
|
|
|
|
if out != None:
|
|
out_paste = str(out[1], 'utf-8')
|
|
return render_template('public_paste.j2', paste = out_paste)
|
|
return render_template('404.j2')
|
|
|
|
|
|
@app.route('/i/<url>')
|
|
def show_image(url):
|
|
idx = short_url.decode_url(url)
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM images WHERE id=?", (idx,))
|
|
out = c.fetchone()
|
|
|
|
if out != None:
|
|
filename = out[1]
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], filename=filename, as_attachment=False)
|
|
return render_template('404.j2')
|
|
|
|
def get_pastes_for_user():
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM pastes WHERE user_id=?", (current_user.get_user_dict()["dn"],))
|
|
|
|
out = []
|
|
for row in c.fetchall():
|
|
a = "{}/p/{}<span class='faded'> - {}</span>".format(short_domain, short_url.encode_url(row[0]), str(row[1], 'utf-8')[:80])
|
|
b = "{}/p/{}".format(short_domain, short_url.encode_url(row[0]))
|
|
out.append((row[0], a, b))
|
|
|
|
return out
|
|
|
|
|
|
def get_links_for_user():
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM links WHERE user_id=?", (current_user.get_user_dict()["dn"],))
|
|
|
|
out = []
|
|
for row in c.fetchall():
|
|
a = "{}/l/{}<span class='faded'> - {}</span>".format(short_domain, short_url.encode_url(row[0]), row[1])
|
|
b = "{}/l/{}".format(short_domain, short_url.encode_url(row[0]))
|
|
out.append((row[0], a, b))
|
|
|
|
return out
|
|
|
|
|
|
def get_images_for_user():
|
|
conn = sqlite3.connect('links/links.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM images WHERE user_id=?", (current_user.get_user_dict()["dn"],))
|
|
|
|
out = []
|
|
for row in c.fetchall():
|
|
a = "{}/i/{}".format(short_domain, short_url.encode_url(row[0]))
|
|
out.append((row[0], a, a))
|
|
|
|
return out
|
|
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
user = current_user
|
|
user.authenticated = False
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
logout_user()
|
|
return redirect(url_for('game'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|