mb-tcp-logger/endpoint.py
2023-10-26 20:58:33 +03:00

222 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from pyModbusTCP.client import ModbusClient
from mysql.connector import connect
from datetime import datetime, timedelta
from threading import Thread
import journal
# SUPPORTED_REGISTER_TYPES = ['D', 'M']
SUPPORTED_REGISTER_TYPES = ['D']
LOG_TYPE_ON_SCAN = 'on-scan'
LOG_TYPE_ON_CHANGE = 'on-change'
def check_duplicates(regs: list):
for i in range(0, len(regs) - 1):
for j in range(i + 1, len(regs)):
if regs[i][0] == regs[j][0] or regs[i][1] == regs[j][1]:
raise Exception(f"Found register duplicates of {regs[i]}")
return False
def get_reg_id(reg):
r_type = reg[0][0]
if r_type not in SUPPORTED_REGISTER_TYPES:
raise Exception(f"Invalid register type {reg} ({r_type})")
r_val = int(reg[0][1:])
r_name = reg[1]
return r_type, r_val, r_name
def find_min_reg(regs: list):
ret = regs[0]
r = get_reg_id(regs[0])
for reg in regs:
rt = get_reg_id(reg)
# [0] - тип, [1] - номер, [2] - имя
if r[0] == rt[0] and r[1] > rt[1]:
r = rt
ret = reg
return ret
def find_range(regs: list):
# [0] - тип, [1] - номер, [2] - имя
start_reg = find_min_reg(regs)
# сразу удалим запись из списка
regs.remove(start_reg)
start_reg = get_reg_id(start_reg)
reg_names = [start_reg[2]]
# тип регистра содержится в первой букве его названия
reg_type = start_reg[0]
# стартовый регистр
reg_id = start_reg[1]
# регистров у нас как минимум 1 (этот же как-то нашли)
regs_count = 1
# теперь пытаемся найти все регистры
while True:
found_r = None
for rd in regs:
r = get_reg_id(rd)
if r[0] == reg_type and r[1] - regs_count == reg_id:
regs_count += 1
found_r = rd
reg_names.append(r[2])
break
if found_r is not None:
regs.remove(found_r)
else:
break
return {
"reg_type": reg_type,
"reg_addr": reg_id,
"count": regs_count,
"names": tuple(reg_names)
}
def parse_ranges(config):
regs = list(config["registers"])
check_duplicates(regs)
ranges = []
while len(regs) > 0:
ranges.append(find_range(regs))
return ranges
class Endpoint(Thread):
def __init__(self, config):
super().__init__()
# применяем настройки modbus
self.config = config
self.mb = None
self.log_type = config["modbus"]["log-type"]
if self.log_type not in [LOG_TYPE_ON_CHANGE, LOG_TYPE_ON_SCAN]:
raise Exception(f"Unsupported logging type '{self.log_type}'")
# список диапазонов и значений
self.ranges = parse_ranges(config)
# база данных
self.connection = None
self.cursor = None
self.db_table = config['database']['db-table']
self.db_col_names = ''
for rn in self.ranges:
for name in rn['names']:
if len(self.db_col_names) != 0:
self.db_col_names += ', '
self.db_col_names += f'`{name}`'
if config['database']['driver'] != "mysql":
raise Exception(f"Unsupported database driver {config['database']['driver']}")
def __mb_read_all_regs(self):
values = []
for rn in self.ranges:
if rn['reg_type'] == 'D':
res = self.mb.read_holding_registers(rn['reg_addr'], rn['count'])
if not res:
raise Exception("Unable to read registers")
for v in res:
values.append(v)
return values
def __read_last_db_regs(self):
self.cursor.execute(f"select {self.db_col_names} from `{self.db_table}`"
f"order by `{self.config['database']['db-datetime-col']}` desc limit 1;")
row = self.cursor.fetchall()
if len(row) != 0:
return row[0]
else:
return None
def __insert_db_row(self, values):
vals_str = ""
for v in values:
if len(vals_str) != 0:
vals_str += f", {v}"
else:
vals_str += f"{v}"
query = f"insert into `{self.db_table}` ({self.db_col_names}) values ({vals_str});"
journal.log(f"Executing query: {query}")
self.cursor.execute(query)
def __endless_loop(self):
# if self.log_type == 'on-change':
# pass
# else:
last_query = datetime.now()
scan_rate = timedelta(microseconds=self.config['modbus']['scan-rate'] * 1000)
if self.log_type == LOG_TYPE_ON_CHANGE:
last_row = self.__read_last_db_regs()
else:
last_row = None
while True:
# вычислим время до следующего опроса
curr_time = datetime.now()
need_time = last_query + scan_rate
if need_time > curr_time:
delta = need_time - curr_time
time.sleep(delta.seconds + (delta.microseconds / 1000000))
while True:
try:
row = self.__mb_read_all_regs()
if self.log_type == LOG_TYPE_ON_SCAN:
self.__insert_db_row(row)
last_row = row
else:
equal = True
if last_row is None:
equal = False
else:
for i in range(0, len(last_row)):
if last_row[i] != row[i]:
equal = False
break
if not equal:
self.__insert_db_row(row)
last_row = row
last_query = need_time
break
except Exception as ex:
journal.log(f"Exception: {ex}")
time.sleep(0.1) # небольшая задержка, чтоб не спамить запросами
need_time = datetime.now() + scan_rate
def run(self):
# запускаем клиента модбас
self.mb = ModbusClient(
host=self.config["modbus"]["host"],
port=self.config["modbus"]["port"],
debug=self.config["modbus"]["debug"],
auto_open=True)
with connect(
host=self.config["database"]["host"],
port=self.config["database"]["port"],
user=self.config["database"]["db-user"],
password=self.config["database"]["db-password"],
database=self.config["database"]["db-name"]
) as _connection:
self.connection = _connection
with self.connection.cursor() as cursor:
self.cursor = cursor
# запуск бесконечного цикла
self.__endless_loop()