188 lines
5.1 KiB
Python
188 lines
5.1 KiB
Python
from __future__ import annotations # Just to have better typing, but useless.
|
|
import json
|
|
import urllib.request
|
|
import urllib.parse
|
|
|
|
import time
|
|
import traceback
|
|
import logging
|
|
import re
|
|
import inspect
|
|
|
|
from typing import Callable
|
|
|
|
log = logging.getLogger(__name__) # Basic logger configuration.
|
|
log.addHandler(logging.NullHandler())
|
|
|
|
|
|
def _logwrap(fn):
|
|
def wrap(*args, **kwargs):
|
|
try:
|
|
return fn(*args, **kwargs)
|
|
except Exception as e:
|
|
log.debug(time.strftime("%l:%M%p %Z"))
|
|
log.debug(traceback.format_exc())
|
|
|
|
log.info(e)
|
|
|
|
return wrap
|
|
|
|
|
|
def get_kwargs():
|
|
frame = inspect.currentframe().f_back
|
|
keys, _, _, values = inspect.getargvalues(frame)
|
|
kwargs = {}
|
|
for key in keys:
|
|
if key != "self":
|
|
kwargs[key] = values[key]
|
|
return kwargs
|
|
|
|
|
|
class Bot:
|
|
def __init__(
|
|
self,
|
|
token: str,
|
|
commands: dict[str, Callable[[Bot, dict, dict], str]],
|
|
fallback=None,
|
|
setup=None,
|
|
):
|
|
|
|
self.token = token
|
|
self.url = "https://api.telegram.org/bot" + self.token + "/"
|
|
self.commands = commands
|
|
|
|
if fallback is None:
|
|
self.fallback = lambda *args: ""
|
|
else:
|
|
self.fallback = fallback
|
|
|
|
if setup is None:
|
|
self.setup = lambda *args: None
|
|
else:
|
|
self.setup = setup
|
|
|
|
self.setup()
|
|
|
|
@_logwrap
|
|
def get_updates(self, offset=None, timeout=100):
|
|
url = self.url + f"getUpdates?timeout={timeout}"
|
|
|
|
if offset:
|
|
url += f"&offset={offset}"
|
|
json_url = urllib.request.urlopen(url).read().decode()
|
|
|
|
js = json.loads(json_url)
|
|
return js
|
|
|
|
@_logwrap
|
|
def get_botinfo(self):
|
|
url = self.url + "getMe"
|
|
|
|
json_url = urllib.request.urlopen(url).read().decode()
|
|
|
|
js = json.loads(json_url)
|
|
return js
|
|
|
|
@_logwrap
|
|
def get_last_update_id(self, updates: dict):
|
|
update_ids = []
|
|
for update in updates["result"]:
|
|
update_ids.append(int(update["update_id"]))
|
|
return max(update_ids)
|
|
|
|
# TODO: debug urllib
|
|
@_logwrap
|
|
def get_chat_member(self, chat_id: int, user_id: int):
|
|
|
|
url = self.url + "getChatMember?chat_id={}&user_id={}".format(chat_id, user_id)
|
|
|
|
json_url = urllib.request.urlopen(url).read().decode()
|
|
|
|
js = json.loads(json_url)
|
|
return js
|
|
|
|
@_logwrap
|
|
def send(self, text: str, chat_id: int, parse_mode: str = "Markdown"):
|
|
|
|
if text != "":
|
|
|
|
params = get_kwargs()
|
|
|
|
if type(text) != str:
|
|
log.warn(f"WARNING: {text} is NOT a string!!!")
|
|
|
|
text = urllib.parse.quote_plus(text)
|
|
|
|
url = self.url + "sendMessage?" + urllib.parse.urlencode(params)
|
|
|
|
# log.debug(f"Sending text: {text}")
|
|
# log.debug(f"Url: {url}")
|
|
|
|
urllib.request.urlopen(url)
|
|
|
|
@_logwrap
|
|
def forwardMessage(
|
|
self,
|
|
chat_id: int,
|
|
from_chat_id: int,
|
|
message_id: int,
|
|
disable_notification: bool = False,
|
|
protect_content: bool = False,
|
|
):
|
|
url = self.url + "forwardMessage?"
|
|
params = get_kwargs()
|
|
query = urllib.parse.urlencode(params)
|
|
urllib.request.urlopen(url + query)
|
|
|
|
#
|
|
# @_logwrap
|
|
# def sendPhoto(self, chat_id: int, photo: str, caption: str, disable_notification: bool = False, protect_content: bool = False):
|
|
# url = self.url + "forwardMessage?"
|
|
# params = get_kwargs()
|
|
# query = urllib.parse.urlencode(params)
|
|
# urllib.request.urlopen(url + query)
|
|
#
|
|
def process(self, updates):
|
|
for update in updates["result"]:
|
|
try:
|
|
if "edited_message" in update.keys():
|
|
messagekind = "edited_message"
|
|
else:
|
|
messagekind = "message"
|
|
|
|
if "text" in update[messagekind].keys():
|
|
text = update[messagekind]["text"]
|
|
chat = update[messagekind]
|
|
elif "photo" in update[messagekind].keys():
|
|
text = update[messagekind]["caption"]
|
|
chat = update[messagekind]
|
|
else:
|
|
return
|
|
|
|
command = re.split(" |@", text)[0]
|
|
|
|
if command in self.commands:
|
|
self.send(
|
|
self.commands[command](self, text, chat), chat["chat"]["id"]
|
|
)
|
|
else:
|
|
self.send(self.fallback(self, text, chat), chat["chat"]["id"])
|
|
|
|
except Exception as e: # we're not using the log wrapper because we care about the update
|
|
logging.debug(time.strftime("%l:%M%p %Z"))
|
|
logging.debug(traceback.format_exc())
|
|
logging.debug(e)
|
|
logging.debug(str(update) + "\n")
|
|
|
|
def poll(self):
|
|
log.debug("started polling")
|
|
last_update_id = None
|
|
while True:
|
|
updates = self.get_updates(last_update_id)
|
|
|
|
if len(updates["result"]) > 0:
|
|
|
|
last_update_id = self.get_last_update_id(updates) + 1
|
|
|
|
self.process(updates)
|