mb-tcp-logger/endpoint.py

224 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from pyModbusTCP.client import ModbusClient
from mysql.connector import connect
from datetime import datetime, timedelta
from threading import Thread
import journal
# SUPPORTED_REGISTER_TYPES = ['D', 'M']
SUPPORTED_REGISTER_TYPES = ['D']
LOG_TYPE_ON_SCAN = 'on-scan'
LOG_TYPE_ON_CHANGE = 'on-change'
def check_duplicates(regs: list):
for i in range(0, len(regs) - 1):
for j in range(i + 1, len(regs)):
if regs[i][0] == regs[j][0] or regs[i][1] == regs[j][1]:
raise Exception(f"Found register duplicates of {regs[i]}")
return False
def get_reg_id(reg):
r_type = reg[0][0]
if r_type not in SUPPORTED_REGISTER_TYPES:
raise Exception(f"Invalid register type {reg} ({r_type})")
r_val = int(reg[0][1:])
r_name = reg[1]
return r_type, r_val, r_name
def find_min_reg(regs: list):
ret = regs[0]
r = get_reg_id(regs[0])
for reg in regs:
rt = get_reg_id(reg)
# [0] - тип, [1] - номер, [2] - имя
if r[0] == rt[0] and r[1] > rt[1]:
r = rt
ret = reg
return ret
def find_range(regs: list):
# [0] - тип, [1] - номер, [2] - имя
start_reg = find_min_reg(regs)
# сразу удалим запись из списка
regs.remove(start_reg)
start_reg = get_reg_id(start_reg)
reg_names = [start_reg[2]]
# тип регистра содержится в первой букве его названия
reg_type = start_reg[0]
# стартовый регистр
reg_id = start_reg[1]
# регистров у нас как минимум 1 (этот же как-то нашли)
regs_count = 1
# теперь пытаемся найти все регистры
while True:
found_r = None
for rd in regs:
r = get_reg_id(rd)
if r[0] == reg_type and r[1] - regs_count == reg_id:
regs_count += 1
found_r = rd
reg_names.append(r[2])
break
if found_r is not None:
regs.remove(found_r)
else:
break
return {
"reg_type": reg_type,
"reg_addr": reg_id,
"count": regs_count,
"names": tuple(reg_names)
}
def parse_ranges(config):
regs = list(config["registers"])
check_duplicates(regs)
ranges = []
while len(regs) > 0:
ranges.append(find_range(regs))
return ranges
class Endpoint(Thread):
def __init__(self, config):
super().__init__()
# применяем настройки modbus
self.config = config
self.mb = None
self.log_type = config["modbus"]["log-type"]
if self.log_type not in [LOG_TYPE_ON_CHANGE, LOG_TYPE_ON_SCAN]:
raise Exception(f"Unsupported logging type '{self.log_type}'")
# список диапазонов и значений
self.ranges = parse_ranges(config)
# база данных
self.db_table = config['database']['db-table']
self.db_col_names = ''
for rn in self.ranges:
for name in rn['names']:
if len(self.db_col_names) != 0:
self.db_col_names += ', '
self.db_col_names += f'`{name}`'
if config['database']['driver'] != "mysql":
raise Exception(f"Unsupported database driver {config['database']['driver']}")
def __mb_read_all_regs(self):
values = []
for rn in self.ranges:
if rn['reg_type'] == 'D':
res = self.mb.read_holding_registers(rn['reg_addr'], rn['count'])
if not res:
raise Exception("Unable to read registers")
for v in res:
values.append(v)
return values
def __read_last_db_regs(self):
with connect(
host=self.config["database"]["host"],
port=self.config["database"]["port"],
user=self.config["database"]["db-user"],
password=self.config["database"]["db-password"],
database=self.config["database"]["db-name"],
) as connection:
with connection.cursor() as cursor:
cursor.execute(f"select {self.db_col_names} from `{self.db_table}`"
f"order by `{self.config['database']['db-datetime-col']}` desc limit 1;")
row = cursor.fetchall()
if len(row) != 0:
return row[0]
else:
return None
def __insert_db_row(self, values):
vals_str = ""
for v in values:
if len(vals_str) != 0:
vals_str += f", {v}"
else:
vals_str += f"{v}"
with connect(
host=self.config["database"]["host"],
port=self.config["database"]["port"],
user=self.config["database"]["db-user"],
password=self.config["database"]["db-password"],
database=self.config["database"]["db-name"]
) as connection:
query = f"insert into `{self.db_table}` ({self.db_col_names}) values ({vals_str});"
journal.log(f"Executing query: {query}")
with connection.cursor() as cursor:
cursor.execute(query)
def __endless_loop(self):
# if self.log_type == 'on-change':
# pass
# else:
last_query = datetime.now()
scan_rate = timedelta(microseconds=self.config['modbus']['scan-rate'] * 1000)
if self.log_type == LOG_TYPE_ON_CHANGE:
last_row = self.__read_last_db_regs()
else:
last_row = None
while True:
try:
row = self.__mb_read_all_regs()
if self.log_type == LOG_TYPE_ON_SCAN:
self.__insert_db_row(row)
last_row = row
else:
equal = True
if last_row is None:
equal = False
else:
for i in range(0, len(last_row)):
if last_row[i] != row[i]:
equal = False
break
if not equal:
self.__insert_db_row(row)
last_row = row
except Exception as ex:
journal.log(f"Exception: {ex}")
# вычислим время до следующего опроса и подождем
# время следующего опроса
need_time = last_query + scan_rate
curr_time = datetime.now()
if need_time > curr_time:
delta = need_time - curr_time
time.sleep(delta.seconds + (delta.microseconds / 1000000))
last_query = need_time
else:
last_query = datetime.now()
def run(self):
# запускаем клиента модбас
self.mb = ModbusClient(
host=self.config["modbus"]["host"],
port=self.config["modbus"]["port"],
debug=self.config["modbus"]["debug"],
auto_open=True)
# запуск бесконечного цикла
self.__endless_loop()