Compare commits

...

5 Commits

View File

@@ -5,11 +5,9 @@ from datetime import datetime, timedelta
from threading import Thread from threading import Thread
import journal import journal
# SUPPORTED_REGISTER_TYPES = ['D', 'M'] # SUPPORTED_REGISTER_TYPES = ['D', 'M']
SUPPORTED_REGISTER_TYPES = ['D'] SUPPORTED_REGISTER_TYPES = ['D']
LOG_TYPE_ON_SCAN = 'on-scan' LOG_TYPE_ON_SCAN = 'on-scan'
LOG_TYPE_ON_CHANGE = 'on-change' LOG_TYPE_ON_CHANGE = 'on-change'
@@ -109,7 +107,6 @@ class Endpoint(Thread):
self.ranges = parse_ranges(config) self.ranges = parse_ranges(config)
# база данных # база данных
self.connection = None
self.db_table = config['database']['db-table'] self.db_table = config['database']['db-table']
self.db_col_names = '' self.db_col_names = ''
for rn in self.ranges: for rn in self.ranges:
@@ -134,14 +131,21 @@ class Endpoint(Thread):
return values return values
def __read_last_db_regs(self): def __read_last_db_regs(self):
with self.connection.cursor() as cursor: with connect(
cursor.execute(f"select {self.db_col_names} from `{self.db_table}`" host=self.config["database"]["host"],
f"order by `{self.config['database']['db-datetime-col']}` desc limit 1;") port=self.config["database"]["port"],
row = cursor.fetchall() user=self.config["database"]["db-user"],
if len(row) != 0: password=self.config["database"]["db-password"],
return row[0] database=self.config["database"]["db-name"],
else: ) as connection:
return None with connection.cursor() as cursor:
cursor.execute(f"select {self.db_col_names} from `{self.db_table}`"
f"order by `{self.config['database']['db-datetime-col']}` desc limit 1;")
row = cursor.fetchall()
if len(row) != 0:
return row[0]
else:
return None
def __insert_db_row(self, values): def __insert_db_row(self, values):
vals_str = "" vals_str = ""
@@ -151,10 +155,17 @@ class Endpoint(Thread):
else: else:
vals_str += f"{v}" vals_str += f"{v}"
query = f"insert into `{self.db_table}` ({self.db_col_names}) values ({vals_str});" with connect(
journal.log(f"Executing query: {query}") host=self.config["database"]["host"],
with self.connection.cursor() as cursor: port=self.config["database"]["port"],
cursor.execute(query) user=self.config["database"]["db-user"],
password=self.config["database"]["db-password"],
database=self.config["database"]["db-name"]
) as connection:
query = f"insert into `{self.db_table}` ({self.db_col_names}) values ({vals_str});"
journal.log(f"[{self.config['database']['db-user']}:{self.db_table}] Executing query: {query}")
with connection.cursor() as cursor:
cursor.execute(query)
def __endless_loop(self): def __endless_loop(self):
# if self.log_type == 'on-change': # if self.log_type == 'on-change':
@@ -168,40 +179,37 @@ class Endpoint(Thread):
last_row = None last_row = None
while True: while True:
# время следующего опроса try:
need_time = last_query + scan_rate row = self.__mb_read_all_regs()
while True: if self.log_type == LOG_TYPE_ON_SCAN:
try: self.__insert_db_row(row)
row = self.__mb_read_all_regs() last_row = row
if self.log_type == LOG_TYPE_ON_SCAN: else:
equal = True
if last_row is None:
equal = False
else:
for i in range(0, len(last_row)):
if last_row[i] != row[i]:
equal = False
break
if not equal:
self.__insert_db_row(row) self.__insert_db_row(row)
last_row = row last_row = row
else: except Exception as ex:
equal = True journal.log(f"[{self.config['database']['db-user']}:{self.db_table}] Exception: {ex}")
if last_row is None:
equal = False
else:
for i in range(0, len(last_row)):
if last_row[i] != row[i]:
equal = False
break
if not equal:
self.__insert_db_row(row)
last_row = row
last_query = need_time
break
except Exception as ex:
journal.log(f"Exception: {ex}")
time.sleep(0.1) # небольшая задержка, чтоб не спамить запросами
need_time = datetime.now() + scan_rate
# вычислим время до следующего опроса и подождем # вычислим время до следующего опроса и подождем
# время следующего опроса
need_time = last_query + scan_rate
curr_time = datetime.now() curr_time = datetime.now()
if need_time > curr_time: if need_time > curr_time:
delta = need_time - curr_time delta = need_time - curr_time
time.sleep(delta.seconds + (delta.microseconds / 1000000)) time.sleep(delta.seconds + (delta.microseconds / 1000000))
last_query = need_time
else:
last_query = datetime.now()
def run(self): def run(self):
# запускаем клиента модбас # запускаем клиента модбас
@@ -210,15 +218,6 @@ class Endpoint(Thread):
port=self.config["modbus"]["port"], port=self.config["modbus"]["port"],
debug=self.config["modbus"]["debug"], debug=self.config["modbus"]["debug"],
auto_open=True) auto_open=True)
with connect(
host=self.config["database"]["host"],
port=self.config["database"]["port"],
user=self.config["database"]["db-user"],
password=self.config["database"]["db-password"],
database=self.config["database"]["db-name"]
) as _connection:
self.connection = _connection
# запуск бесконечного цикла
self.__endless_loop()
# запуск бесконечного цикла
self.__endless_loop()