import json import os import random import traceback import shlex import vk_api from vk_api.longpoll import VkLongPoll, VkEventType, VkLongpollMode from datetime import datetime import time import dotenv dotenv.load_dotenv() vk_session = vk_api.VkApi(token=os.getenv('VK_ACCESS_TOKEN'), api_version='5.131') vk = vk_session.get_api() # массив peer_id, которым можно доверять TRUSTED_USERS = eval(os.getenv('TRUSTED_USERS')) class Logger: def __init__(self, filename=None): self.filename = filename def log(self, message): if self.filename is not None: with open(self.filename, 'a') as f: f.write(f"[{datetime.now().strftime('%d %b %Y %H:%M:%S')}] {message}\n") msg_logger = Logger('messages-log.log') class StickersCache: __cache = {} @staticmethod def get_keywords(sticker_id: int): global vk if sticker_id in StickersCache.__cache: return StickersCache.__cache[sticker_id] try: res = vk.store.getStickersKeywords(stickers_ids=sticker_id) words = res['dictionary'][0]['words'] if type(words) == list: words = words[:3] StickersCache.__cache[sticker_id] = words return words except Exception: return "" def __create_msg_string(event): peer_id = event.peer_id user_id = event.user_id if event.from_me: user_id = 0 text = event.text.replace('\n', '
') if 'attachments' in event.attachments: attachments = json.loads(event.attachments['attachments']) else: attachments = event.attachments if type(attachments) == list and len(attachments) == 1: if attachments[0]['type'] == "sticker": # узнаем текст для ID стикера text = f"sticker keyword: {StickersCache.get_keywords(attachments[0]['sticker']['sticker_id'])}" attachments = [{ "type": "sticker", "sticker": { "product_id": attachments[0]['sticker']['product_id'], "sticker_id": attachments[0]['sticker']['sticker_id'] } }] return f"id={event.message_id} peer={peer_id} user_id={user_id} text=\"{text}\" attachments={attachments}" def log_new_message(event): msg_logger.log(f"message_new {__create_msg_string(event)}") def log_message_edit(event): msg_logger.log(f"message_edit {__create_msg_string(event)}") def log_message_delete(event): peer_id = event.peer_id user_id = event.user_id if event.from_me: user_id = 0 msg_logger.log(f"message_del id={event.message_id} peer={peer_id} user_id={user_id}") def log_message_recovery(event): msg_logger.log(f"message_rec {__create_msg_string(event)}") def cmd_handler(event, *args): if event.peer_id not in TRUSTED_USERS: print(f"Команда {args}: отказано в доступе - недоверенный пользователь id={event.peer_id}") else: print(f"Execute {args}") if len(args) >= 1: if args[0] == "рандом" or args[0] == "random": if len(args) != 3: vk.messages.send(peer_id=event.peer_id, random_id=random.randint(0, 0x7FFFFFFF), message="Использование команды рандом: /рандом <от> <до>") else: try: start = int(args[1]) end = int(args[2]) vk.messages.send(peer_id=event.peer_id, random_id=random.randint(0, 0x7FFFFFFF), message=f"Результат: {random.randint(start, end)}") except Exception: vk.messages.send(peer_id=event.peer_id, random_id=random.randint(0, 0x7FFFFFFF), message="Один из аргументов не является числом!") if args[0] == "выбери" or args[0] == "select": if len(args) == 1: vk.messages.send(peer_id=event.peer_id, random_id=random.randint(0, 0x7FFFFFFF), message="Использование команды выбери: /выбери <одно> [второе] [третье] ...") else: vk.messages.send(peer_id=event.peer_id, random_id=random.randint(0, 0x7FFFFFFF), message=f"Результат: {random.choice(args[2:])}") def lp_loop(): mode = VkLongpollMode.GET_ATTACHMENTS | VkLongpollMode.GET_EXTENDED | VkLongpollMode.GET_PTS |\ VkLongpollMode.GET_EXTRA_ONLINE | VkLongpollMode.GET_RANDOM_ID lp = VkLongPoll(vk_session, mode=mode) vk.account.setOnline(voip=0) while True: try: for event in lp.listen(): if event.type == VkEventType.MESSAGE_NEW: log_new_message(event) # event.from_me and if event.from_user and event.text.startswith("/"): first_line = event.text[1:].split('\n')[0].replace('"', '"') cmd_handler(event, *shlex.split(first_line)) elif event.type == VkEventType.MESSAGE_EDIT: log_message_edit(event) elif event.type == VkEventType.MESSAGE_FLAGS_SET: if (event.mask & 128) != 0: log_message_delete(event) elif event.type == VkEventType.MESSAGE_FLAGS_RESET: if (event.mask & 128) != 0: log_message_recovery(event) # elif event.type == VkEventType.USER_ONLINE: # print('Пользователь', event.user_id, 'онлайн', event.platform) # # elif event.type == VkEventType.USER_OFFLINE: # print('Пользователь', event.user_id, 'оффлайн', event.offline_type) elif event.type == VkEventType.MESSAGES_COUNTER_UPDATE: pass # не принимаем такие события else: print(event.type, event.raw[1:]) except Exception: traceback.print_exc() time.sleep(1) if __name__ == '__main__': lp_loop()