soc-streamer/server.py
2024-05-02 10:48:04 +03:00

223 lines
7.4 KiB
Python

#!/bin/python
import traceback
from datetime import datetime, timedelta
import socket
from threading import Thread, Lock
from streamer_utils import SocketBlocksWrapper
import sqlite3
db_connection = sqlite3.connect("boards.sqlite3", check_same_thread=False)
LISTEN = ('', 40100)
MAX_ATTEMPTS = 3
BLOCK_TIME_MINUTES = 3
__log_lock = Lock()
def _log(message, owner="__base__"):
with __log_lock:
print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] {owner}: {message}")
class ServerWorker(Thread):
def __init__(self, factory, conn, addr):
super().__init__(daemon=True)
self.conn = conn
self._log_name = f"Worker-{addr}"
self.factory = factory
self._client_type = None
self._client_name = None
self._cursor = db_connection.cursor()
def _auth_board(self, obj):
if 'name' not in obj:
return True, 'missing field "name"!'
self._cursor.execute("SELECT board_name, password FROM users WHERE board_name = ?;", (obj['name'], ))
res = self._cursor.fetchone()
if res is None:
return True, 'this board is not registered!'
self._client_name = obj['name']
self._client_type = "board"
return False, None
def _auth_client(self, obj):
if 'target' not in obj:
return True, 'missing field "target"!'
if 'password' not in obj:
return True, 'missing field "password"!'
self._cursor.execute("SELECT board_name, password, blocked_until, wrong_attempts FROM users "
"WHERE board_name = ?;",
(obj['target'], ))
res = self._cursor.fetchone()
# сначала проверка на то, что плата существует
if res is None:
return True, 'Аккаунт не найден!'
# теперь на то, что аккаунт не заблокирован
now = datetime.now()
delta = res[2] - int(round(now.timestamp()))
print(delta)
if delta >= 0:
return True, f'Доступ отклонен: аккаунт заблокирован! Разблокировка через {delta} секунд'
# проверка пароля
if res[1] != obj['password']:
if res[3] >= MAX_ATTEMPTS - 1:
dt = now + timedelta(minutes=BLOCK_TIME_MINUTES)
t = int(round(dt.timestamp()))
self._cursor.execute("UPDATE users SET wrong_attempts = 0, blocked_until = ? WHERE board_name = ?;",
(t, res[0]))
db_connection.commit()
return True, f'Доступ отклонен: аккаунт заблокирован! Разблокировка через {BLOCK_TIME_MINUTES} минут(ы)'
else:
self._cursor.execute("UPDATE users SET wrong_attempts = wrong_attempts + 1 WHERE board_name = ?;",
(res[0], ))
db_connection.commit()
return True, 'Доступ отклонен: неверный пароль!'
# обновление неудачных попыток в случае если пароль верный
if res[3] != 0:
self._cursor.execute("UPDATE users SET wrong_attempts = 0 WHERE board_name = ?;",
(res[0], ))
db_connection.commit()
self._client_name = obj['target']
self._client_type = "client"
return False, None
def _auth(self):
_log("Wait for auth...", self._log_name)
err = True
obj = self.conn.read_object()
if type(obj) != dict:
description = "invalid object type!"
elif 'type' not in obj:
description = 'missing field "type"!'
elif obj['type'] != 'auth':
description = 'field "type" must have value "auth"!'
elif 'client-type' not in obj:
description = 'missing field "client-type"!'
elif obj['client-type'] != 'board' and obj['client-type'] != 'client':
description = f'unsupported client type: "{obj["client-type"]}"'
else:
if obj['client-type'] == 'board':
err, description = self._auth_board(obj)
else:
err, description = self._auth_client(obj)
response = {
'type': 'auth-response'
}
if err:
response["status"] = "failed"
response["description"] = description
else:
response["status"] = "success"
_log(f"auth {response['status']}! request={obj}, response={response}", owner=self._log_name)
self.conn.write_object(response)
return not err
def run(self):
try:
with self.conn:
if self._auth():
while True:
recv = self.conn.read_object()
if recv is None:
break
# _log(f"received {recv['type']} frame", self._log_name)
self.factory.route_packet(self, recv)
except Exception:
traceback.print_exc()
finally:
self.factory.remove_connection(self)
_log(f"Close connection!", self._log_name)
def get_name(self):
return self._client_name
def get_log_name(self):
return self._log_name
def send_packet(self, packet):
self.conn.write_object(packet)
# _log("send routed packet", self._log_name)
class ServerWorkerFactory:
def __init__(self):
self._lock = Lock()
self._connections = []
def add_connection(self, conn, addr):
with self._lock:
worker = ServerWorker(self, conn, addr)
worker.start()
self._connections.append(worker)
def remove_connection(self, conn: ServerWorker):
with self._lock:
if conn in self._connections:
_log(f"remove connection {conn.get_log_name()}", "ServerWorkerFactory")
self._connections.remove(conn)
def route_packet(self, owner: ServerWorker, data):
connections = None
with self._lock:
if owner in self._connections:
connections = self._connections.copy()
if connections is not None:
name = owner.get_name()
for c in connections:
if c == owner:
continue
if c.get_name() == name:
c.send_packet(data)
def server_listener():
_log("============ SERVER ============")
_log("Creating table...")
cur = db_connection.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS users (
board_name TEXT NOT NULL PRIMARY KEY,
password TEXT DEFAULT '' NOT NULL,
blocked_until INT DEFAULT 0 NOT NULL,
wrong_attempts INT DEFAULT 0 NOT NULL
);""")
db_connection.commit()
with socket.socket() as sock:
sock.bind(LISTEN)
_log(f"socket bind success", "ServerTask")
sock.listen(8)
_log(f"socket listen...", "ServerTask")
worker = ServerWorkerFactory()
while True:
connection, addr = sock.accept()
_log(f"connected from {addr}", "ServerTask")
worker.add_connection(SocketBlocksWrapper(connection), addr)
if __name__ == '__main__':
server_listener()