168 lines
5.7 KiB
Python
Executable File
168 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import uuid
|
|
import sqlite3
|
|
import base64
|
|
from datetime import datetime
|
|
import pytz
|
|
import os
|
|
from flask import Flask, request, render_template
|
|
from flask_uuid import FlaskUUID
|
|
from flask_qrcode import QRcode
|
|
from flask_socketio import SocketIO, emit
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
import hashlib
|
|
import requests
|
|
|
|
|
|
app = Flask(__name__)
|
|
FlaskUUID(app)
|
|
qrcode = QRcode(app)
|
|
app.config['SECRET_KEY'] = '53a8373e7ae652cd38beba15454b1dc4'
|
|
app.wsgi_app = ProxyFix(app.wsgi_app)
|
|
socketio = SocketIO(app, async_mode=None)
|
|
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect('database.db')
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def hash_string(string):
|
|
salt = 'xpwQDFQ7gvcQ--rgO7l9yA'
|
|
return hashlib.md5(salt.encode() + string.encode()).hexdigest()
|
|
|
|
|
|
def check_hash(string, hashed_string):
|
|
salt = 'xpwQDFQ7gvcQ--rgO7l9yA'
|
|
return hashed_string == hashlib.md5(salt.encode() + string.encode()).hexdigest()
|
|
|
|
|
|
def type_string(string):
|
|
return hashlib.md5(string.encode()).hexdigest()
|
|
|
|
|
|
def check_type(string):
|
|
if string == '02181ec55d150f6230547dbbf313e4f8':
|
|
type_check = 'CheckIn'
|
|
elif string == '231d5cc583e69d5b6c5bdced40dcc27c':
|
|
type_check = 'CheckOut'
|
|
else:
|
|
type_check = 'mistake'
|
|
return type_check
|
|
|
|
|
|
def generate_next_url(type, ip_branch):
|
|
next_uuid = str(uuid.uuid1())
|
|
url = str(next_uuid) + '/' + hash_string(next_uuid) + \
|
|
'/' + type_string(type) + '/' + (ip_branch)
|
|
message_bytes = url.encode('ascii')
|
|
base64_bytes = base64.b64encode(message_bytes)
|
|
url = base64_bytes.decode('ascii')
|
|
return url
|
|
|
|
|
|
@app.route('/')
|
|
def show_qr_and_list():
|
|
# TODO: reject direct connections to server; allow access only via proxy
|
|
get_list = requests.get(
|
|
'http://137.184.126.24:8080/hr_attendance_extended/public/attendance/').json()
|
|
list_ips = get_list["list_ips"]
|
|
ip_branch = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
|
|
if ip_branch in list_ips:
|
|
access = True
|
|
else:
|
|
access = False
|
|
return render_template("template.html", ip_branch=ip_branch, access=access)
|
|
|
|
|
|
@app.route('/<uuid:id>/<hashed>/<type>/<ip_branch>')
|
|
def catch_uuids(id, hashed, type, ip_branch):
|
|
user = request.headers.get('Remote-User')
|
|
# TODO: Check directly with Authelia using https://auth.agofer.net/api/verify
|
|
time = datetime.now(pytz.timezone('America/Bogota')
|
|
).strftime("%Y-%m-%d %H:%M:%S")
|
|
type_check = check_type(str(type))
|
|
error = None
|
|
data = []
|
|
conn = get_db_connection()
|
|
existing1 = conn.execute(
|
|
'SELECT * FROM hits WHERE uuid1 = ?', (str(id),)).fetchone()
|
|
existing2 = conn.execute(
|
|
'SELECT * FROM hits WHERE uuid2 = ?', (str(id),)).fetchone()
|
|
if not user:
|
|
error = 'NO_USERNAME'
|
|
elif existing1:
|
|
error = 'ALREADY_USED'
|
|
elif existing2:
|
|
error = 'ALREADY_USED'
|
|
elif not check_hash(str(id), str(hashed)):
|
|
error = 'DIFFERENT_NODE'
|
|
else:
|
|
if type_check == 'mistake':
|
|
error = 'DIFFERENT_TYPE'
|
|
else:
|
|
exit = conn.execute(
|
|
'SELECT * FROM hits WHERE user = ? AND dateout = ?', (user, '')).fetchone()
|
|
if type_check == 'CheckIn':
|
|
if exit:
|
|
error = 'MISSING_EXIT'
|
|
else:
|
|
conn.execute(
|
|
"INSERT INTO hits (user, uuid1, datein) VALUES (?, ?, ?)", (user, str(id), time))
|
|
conn.commit()
|
|
url = generate_next_url(type_check, str(ip_branch))
|
|
qr = qrcode(url)
|
|
socketio.emit('qr_used', {'data': (
|
|
qr, url, user, time, type_check,)})
|
|
elif type_check == 'CheckOut':
|
|
if exit:
|
|
conn.execute(
|
|
"UPDATE hits SET dateout = ?, uuid2 = ?, ip_branch = ? WHERE user = ? AND dateout = ?", (time, str(id), str(ip_branch), user, ''))
|
|
conn.commit()
|
|
url = generate_next_url(type_check, str(ip_branch))
|
|
qr = qrcode(url)
|
|
os.system('python get_query.py')
|
|
socketio.emit('qr_used', {'data': (
|
|
qr, url, user, time, type_check)})
|
|
else:
|
|
error = 'MISSING_ENTER'
|
|
data = conn.execute(
|
|
'SELECT * FROM hits WHERE user = ? ORDER BY id DESC LIMIT 10', (user,)).fetchall()
|
|
conn.close()
|
|
return render_template('thanks.html', user=user, time=time, error=error, type=type_check, hits=data)
|
|
|
|
|
|
@socketio.on('message')
|
|
def handle_message(data):
|
|
print('received message: ' + data)
|
|
|
|
|
|
@socketio.on('connection')
|
|
def handle_initial_connection(json_data):
|
|
ip_branch = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
|
|
url = generate_next_url('CheckIn', ip_branch)
|
|
url2 = generate_next_url('CheckOut', ip_branch)
|
|
qr = qrcode(url)
|
|
qr2 = qrcode(url2)
|
|
conn = get_db_connection()
|
|
last_entries = conn.execute(
|
|
'SELECT user, datein FROM hits WHERE datein != ? ORDER BY id DESC LIMIT 10', ('',)).fetchall()
|
|
last_entries2 = conn.execute(
|
|
'SELECT user, dateout FROM hits WHERE dateout != ? ORDER BY id DESC LIMIT 10', ('',)).fetchall()
|
|
array_entries = []
|
|
array_entries2 = []
|
|
for entry in last_entries:
|
|
array_entries.append((entry[0], entry[1]))
|
|
for entry2 in last_entries2:
|
|
array_entries2.append((entry2[0], entry2[1]))
|
|
conn.close()
|
|
emit('initial_qr', {
|
|
'data': (qr, url, array_entries, qr2, url2, array_entries2)})
|
|
print('received json: ' + str(json_data))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
socketio.run(app, host='0.0.0.0')
|