From f067b126190ad72fa78e74c28150d77816deea9c Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 26 Oct 2023 20:58:33 +0300 Subject: [PATCH] First commit --- .gitignore | 3 + README.md | 3 + endpoint.py | 221 ++++++++++++++++++++++++++++++++++++ example-config/example.json | 26 +++++ journal.py | 4 + main.py | 19 ++++ requirements.txt | 3 + 7 files changed, 279 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 endpoint.py create mode 100644 example-config/example.json create mode 100644 journal.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aa969b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/.idea/ +nodes +mb-tcp-logger.iml diff --git a/README.md b/README.md new file mode 100644 index 0000000..7a52793 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# mb-tcp-logger + +Проект создан для организации логгирования всего, что происходит с ПЛК по протоколу Modbus TCP. diff --git a/endpoint.py b/endpoint.py new file mode 100644 index 0000000..857cc2a --- /dev/null +++ b/endpoint.py @@ -0,0 +1,221 @@ +import time +from pyModbusTCP.client import ModbusClient +from mysql.connector import connect +from datetime import datetime, timedelta +from threading import Thread +import journal + + +# SUPPORTED_REGISTER_TYPES = ['D', 'M'] +SUPPORTED_REGISTER_TYPES = ['D'] + + +LOG_TYPE_ON_SCAN = 'on-scan' +LOG_TYPE_ON_CHANGE = 'on-change' + + +def check_duplicates(regs: list): + for i in range(0, len(regs) - 1): + for j in range(i + 1, len(regs)): + if regs[i][0] == regs[j][0] or regs[i][1] == regs[j][1]: + raise Exception(f"Found register duplicates of {regs[i]}") + return False + + +def get_reg_id(reg): + r_type = reg[0][0] + if r_type not in SUPPORTED_REGISTER_TYPES: + raise Exception(f"Invalid register type {reg} ({r_type})") + r_val = int(reg[0][1:]) + r_name = reg[1] + return r_type, r_val, r_name + + +def find_min_reg(regs: list): + ret = regs[0] + r = get_reg_id(regs[0]) + for reg in regs: + rt = get_reg_id(reg) + # [0] - тип, [1] - номер, [2] - имя + if r[0] == rt[0] and r[1] > rt[1]: + r = rt + ret = reg + + return ret + + +def find_range(regs: list): + # [0] - тип, [1] - номер, [2] - имя + start_reg = find_min_reg(regs) + # сразу удалим запись из списка + regs.remove(start_reg) + start_reg = get_reg_id(start_reg) + reg_names = [start_reg[2]] + + # тип регистра содержится в первой букве его названия + reg_type = start_reg[0] + + # стартовый регистр + reg_id = start_reg[1] + + # регистров у нас как минимум 1 (этот же как-то нашли) + regs_count = 1 + + # теперь пытаемся найти все регистры + while True: + found_r = None + for rd in regs: + r = get_reg_id(rd) + if r[0] == reg_type and r[1] - regs_count == reg_id: + regs_count += 1 + found_r = rd + reg_names.append(r[2]) + break + if found_r is not None: + regs.remove(found_r) + else: + break + + return { + "reg_type": reg_type, + "reg_addr": reg_id, + "count": regs_count, + "names": tuple(reg_names) + } + + +def parse_ranges(config): + regs = list(config["registers"]) + check_duplicates(regs) + + ranges = [] + while len(regs) > 0: + ranges.append(find_range(regs)) + + return ranges + + +class Endpoint(Thread): + def __init__(self, config): + super().__init__() + # применяем настройки modbus + self.config = config + self.mb = None + self.log_type = config["modbus"]["log-type"] + if self.log_type not in [LOG_TYPE_ON_CHANGE, LOG_TYPE_ON_SCAN]: + raise Exception(f"Unsupported logging type '{self.log_type}'") + + # список диапазонов и значений + self.ranges = parse_ranges(config) + + # база данных + self.connection = None + self.cursor = None + self.db_table = config['database']['db-table'] + self.db_col_names = '' + for rn in self.ranges: + for name in rn['names']: + if len(self.db_col_names) != 0: + self.db_col_names += ', ' + self.db_col_names += f'`{name}`' + + if config['database']['driver'] != "mysql": + raise Exception(f"Unsupported database driver {config['database']['driver']}") + + def __mb_read_all_regs(self): + values = [] + for rn in self.ranges: + if rn['reg_type'] == 'D': + res = self.mb.read_holding_registers(rn['reg_addr'], rn['count']) + if not res: + raise Exception("Unable to read registers") + + for v in res: + values.append(v) + return values + + def __read_last_db_regs(self): + self.cursor.execute(f"select {self.db_col_names} from `{self.db_table}`" + f"order by `{self.config['database']['db-datetime-col']}` desc limit 1;") + row = self.cursor.fetchall() + if len(row) != 0: + return row[0] + else: + return None + + def __insert_db_row(self, values): + vals_str = "" + for v in values: + if len(vals_str) != 0: + vals_str += f", {v}" + else: + vals_str += f"{v}" + + query = f"insert into `{self.db_table}` ({self.db_col_names}) values ({vals_str});" + journal.log(f"Executing query: {query}") + self.cursor.execute(query) + + def __endless_loop(self): + # if self.log_type == 'on-change': + # pass + # else: + last_query = datetime.now() + scan_rate = timedelta(microseconds=self.config['modbus']['scan-rate'] * 1000) + if self.log_type == LOG_TYPE_ON_CHANGE: + last_row = self.__read_last_db_regs() + else: + last_row = None + + while True: + # вычислим время до следующего опроса + curr_time = datetime.now() + need_time = last_query + scan_rate + if need_time > curr_time: + delta = need_time - curr_time + time.sleep(delta.seconds + (delta.microseconds / 1000000)) + while True: + try: + row = self.__mb_read_all_regs() + if self.log_type == LOG_TYPE_ON_SCAN: + self.__insert_db_row(row) + last_row = row + else: + equal = True + if last_row is None: + equal = False + else: + for i in range(0, len(last_row)): + if last_row[i] != row[i]: + equal = False + break + if not equal: + self.__insert_db_row(row) + last_row = row + + last_query = need_time + break + except Exception as ex: + journal.log(f"Exception: {ex}") + time.sleep(0.1) # небольшая задержка, чтоб не спамить запросами + need_time = datetime.now() + scan_rate + + def run(self): + # запускаем клиента модбас + self.mb = ModbusClient( + host=self.config["modbus"]["host"], + port=self.config["modbus"]["port"], + debug=self.config["modbus"]["debug"], + auto_open=True) + with connect( + host=self.config["database"]["host"], + port=self.config["database"]["port"], + user=self.config["database"]["db-user"], + password=self.config["database"]["db-password"], + database=self.config["database"]["db-name"] + ) as _connection: + self.connection = _connection + with self.connection.cursor() as cursor: + self.cursor = cursor + + # запуск бесконечного цикла + self.__endless_loop() diff --git a/example-config/example.json b/example-config/example.json new file mode 100644 index 0000000..4ca88c3 --- /dev/null +++ b/example-config/example.json @@ -0,0 +1,26 @@ +{ + "type": "mb-tcp-logger", + "database": { + "driver": "mysql", + "host": "ip_or_domain", + "port": 3306, + "db-user": "database_user", + "db-password": "database_password", + "db-name": "database_table", + "db-table": "table for insert values", + "db-datetime-col": "dt" + }, + "modbus": { + "host": "ip_or_domain", + "port": 1234, + "scan-rate": 1000, + "log-type": "on-change OR on-scan", + "debug": false, + "systemd-log": "on-change OR on-scan OR errors OR none" + }, + "registers": [ + ["D1", "col_name1"], + ["D2", "col_name2"], + ["D3", "col_name3"] + ] +} \ No newline at end of file diff --git a/journal.py b/journal.py new file mode 100644 index 0000000..098a5bb --- /dev/null +++ b/journal.py @@ -0,0 +1,4 @@ + +def log(message): + print(message) + diff --git a/main.py b/main.py new file mode 100644 index 0000000..0b9e047 --- /dev/null +++ b/main.py @@ -0,0 +1,19 @@ +import json +import os +from endpoint import Endpoint +import journal + +CONFIG_DIR = "nodes/" + + +def start(): + configs = os.listdir(CONFIG_DIR) + for config in configs: + journal.log(f"Creating {CONFIG_DIR}{config}") + with open(CONFIG_DIR + config, "r") as f: + endpoint = Endpoint(json.load(f)) + endpoint.start() + + +if __name__ == '__main__': + start() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..00f8875 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pyModbusTCP==0.2.0 +python-dotenv==1.0.0 +mysql-connector-python==8.2.0