tgbarebot/bot.py

188 lines
5.1 KiB
Python

from __future__ import annotations # Just to have better typing, but useless.
import json
import urllib.request
import urllib.parse
import time
import traceback
import logging
import re
import inspect
from typing import Callable
log = logging.getLogger(__name__) # Basic logger configuration.
log.addHandler(logging.NullHandler())
def _logwrap(fn):
def wrap(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
log.debug(time.strftime("%l:%M%p %Z"))
log.debug(traceback.format_exc())
log.info(e)
return wrap
def get_kwargs():
frame = inspect.currentframe().f_back
keys, _, _, values = inspect.getargvalues(frame)
kwargs = {}
for key in keys:
if key != "self":
kwargs[key] = values[key]
return kwargs
class Bot:
def __init__(
self,
token: str,
commands: dict[str, Callable[[Bot, dict, dict], str]],
fallback=None,
setup=None,
):
self.token = token
self.url = "https://api.telegram.org/bot" + self.token + "/"
self.commands = commands
if fallback is None:
self.fallback = lambda *args: ""
else:
self.fallback = fallback
if setup is None:
self.setup = lambda *args: None
else:
self.setup = setup
self.setup()
@_logwrap
def get_updates(self, offset=None, timeout=100):
url = self.url + f"getUpdates?timeout={timeout}"
if offset:
url += f"&offset={offset}"
json_url = urllib.request.urlopen(url).read().decode()
js = json.loads(json_url)
return js
@_logwrap
def get_botinfo(self):
url = self.url + "getMe"
json_url = urllib.request.urlopen(url).read().decode()
js = json.loads(json_url)
return js
@_logwrap
def get_last_update_id(self, updates: dict):
update_ids = []
for update in updates["result"]:
update_ids.append(int(update["update_id"]))
return max(update_ids)
# TODO: debug urllib
@_logwrap
def get_chat_member(self, chat_id: int, user_id: int):
url = self.url + "getChatMember?chat_id={}&user_id={}".format(chat_id, user_id)
json_url = urllib.request.urlopen(url).read().decode()
js = json.loads(json_url)
return js
@_logwrap
def send(self, text: str, chat_id: int, parse_mode: str = "Markdown"):
if text != "":
params = get_kwargs()
if type(text) != str:
log.warn(f"WARNING: {text} is NOT a string!!!")
text = urllib.parse.quote_plus(text)
url = self.url + "sendMessage?" + urllib.parse.urlencode(params)
# log.debug(f"Sending text: {text}")
# log.debug(f"Url: {url}")
urllib.request.urlopen(url)
@_logwrap
def forwardMessage(
self,
chat_id: int,
from_chat_id: int,
message_id: int,
disable_notification: bool = False,
protect_content: bool = False,
):
url = self.url + "forwardMessage?"
params = get_kwargs()
query = urllib.parse.urlencode(params)
urllib.request.urlopen(url + query)
#
# @_logwrap
# def sendPhoto(self, chat_id: int, photo: str, caption: str, disable_notification: bool = False, protect_content: bool = False):
# url = self.url + "forwardMessage?"
# params = get_kwargs()
# query = urllib.parse.urlencode(params)
# urllib.request.urlopen(url + query)
#
def process(self, updates):
for update in updates["result"]:
try:
if "edited_message" in update.keys():
messagekind = "edited_message"
else:
messagekind = "message"
if "text" in update[messagekind].keys():
text = update[messagekind]["text"]
chat = update[messagekind]
elif "photo" in update[messagekind].keys():
text = update[messagekind]["caption"]
chat = update[messagekind]
else:
return
command = re.split(" |@", text)[0]
if command in self.commands:
self.send(
self.commands[command](self, text, chat), chat["chat"]["id"]
)
else:
self.send(self.fallback(self, text, chat), chat["chat"]["id"])
except Exception as e: # we're not using the log wrapper because we care about the update
logging.debug(time.strftime("%l:%M%p %Z"))
logging.debug(traceback.format_exc())
logging.debug(e)
logging.debug(str(update) + "\n")
def poll(self):
log.debug("started polling")
last_update_id = None
while True:
updates = self.get_updates(last_update_id)
if len(updates["result"]) > 0:
last_update_id = self.get_last_update_id(updates) + 1
self.process(updates)