attendance-flask-mini-app/main.py

108 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
import uuid
import sqlite3
from datetime import datetime
from flask import Flask, request, render_template, current_app
from flask_uuid import FlaskUUID
from flask_qrcode import QRcode
from flask_socketio import SocketIO, emit
from werkzeug.middleware.proxy_fix import ProxyFix
import hashlib
import json
# WIP: use sockets to broadcast when a hit to the expected uuid has been
# received, so that connected clients (the Chromium public browser) can
# refresh the qr code and update the list of users
# https://flask-socketio.readthedocs.io
app = Flask(__name__)
FlaskUUID(app)
#QRcode(app)
qrcode = QRcode(app)
app.config['SECRET_KEY'] = '53a8373e7ae652cd38beba15454b1dc4'
#app = ProxyFix(app, x_for=1, x_host=1)
app.wsgi_app = ProxyFix(app.wsgi_app)
socketio = SocketIO(app, async_mode=None)
def get_db_connection():
conn = sqlite3.connect('database.db')
conn.row_factory = sqlite3.Row
return conn
def hash_string(string):
salt = 'xpwQDFQ7gvcQ--rgO7l9yA'
return hashlib.md5(salt.encode() + string.encode()).hexdigest()
def check_hash(string, hashed_string):
salt = 'xpwQDFQ7gvcQ--rgO7l9yA'
return hashed_string == hashlib.md5(salt.encode() + string.encode()).hexdigest()
def generate_next_url():
next_uuid = str(uuid.uuid1())
url = request.url_root + str(next_uuid) + '/' + hash_string(next_uuid)
return url
@app.route('/')
def show_qr_and_list():
# TODO: reject direct connections to server; allow access only via proxy
return render_template("template.html")
@app.route('/scan')
def show_qr_reader():
return current_app.send_static_file('scan.html')
@app.route('/<uuid:id>/<hashed>')
def catch_uuids(id, hashed):
user = request.headers.get('Remote-User')
# TODO: Check directly with Authelia using https://auth.agofer.net/api/verify
time = datetime.now().strftime("%A %Y-%m-%d %H:%M:%S")
error = None
data = []
conn = get_db_connection()
existing = conn.execute(
'SELECT * FROM hits WHERE uuid = ?', (str(id),)).fetchone()
if not user:
error = 'NO_USERNAME'
elif existing:
error = 'ALREADY_USED'
elif not check_hash(str(id), str(hashed)):
error = 'DIFFERENT_NODE'
else:
conn.execute("INSERT INTO hits (uuid, user) VALUES (?, ?)", (str(id), user))
conn.commit()
#socketio.emit('qr_used', {'data': (time, user, str(id))})
url = generate_next_url()
qr = qrcode(url)
socketio.emit('qr_used', {'data': (qr, url, user, time)})
data = conn.execute(
'SELECT * FROM hits WHERE user = ? ORDER BY id DESC LIMIT 10', (user,)
).fetchall()
conn.close()
return render_template('thanks.html', user=user, time=time, error=error, hits=data)
@socketio.on('message')
def handle_message(data):
print('received message: ' + data)
@socketio.on('connection')
def handle_initial_connection(json_data):
url = generate_next_url()
qr = qrcode(url)
conn = get_db_connection()
last_entries = conn.execute('SELECT user, created FROM hits ORDER BY id DESC LIMIT 10')
array_entries = []
for entry in last_entries:
array_entries.append((entry[0], entry[1]))
conn.close()
emit('initial_qr', {'data': (qr, url, array_entries)})
#socketio.emit('initial_qr', {'data': (qr,)})
print('received json: ' + str(json_data))
if __name__ == '__main__':
#app.run(host='0.0.0.0')
socketio.run(app, host='0.0.0.0')