Formatted code, minor changes, changed the way the telegram api is encoded (to make implementing the rest of the API easier)

This commit is contained in:
Raphy 2022-02-01 10:14:12 +01:00
parent e6650f0bde
commit fe8871b0ae
2 changed files with 61 additions and 19 deletions

View File

@ -50,6 +50,8 @@ def main():
b = Bot(token, commands) # feed the bot with your token and a dict of commands.
b.poll()
if __name__ == "__main__":
main()
```
@ -92,4 +94,3 @@ The library makes use of python's standard logging lib. See the example main to
- Add more functions to Bot (ideally the entire telegram api)
- Document the functions better
- Explain how to hack state into this thing (there's a way to do it, but it's fairly painful)
Explain how to hack state into this thing (there's a way to do it, but it's fairly painful)

77
bot.py
View File

@ -1,4 +1,4 @@
from __future__ import annotations # Just to have better typing, but useless.
from __future__ import annotations # Just to have better typing, but useless.
import json
import urllib.request
import urllib.parse
@ -7,26 +7,45 @@ import time
import traceback
import logging
import re
import inspect
from typing import Callable
log = logging.getLogger(__name__) # Basic logger configuration.
log = logging.getLogger(__name__) # Basic logger configuration.
log.addHandler(logging.NullHandler())
def _logwrap(fn):
def wrap(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
log.debug(time.strftime('%l:%M%p %Z'))
log.debug(time.strftime("%l:%M%p %Z"))
log.debug(traceback.format_exc())
log.info(e)
return wrap
def get_kwargs():
frame = inspect.currentframe().f_back
keys, _, _, values = inspect.getargvalues(frame)
kwargs = {}
for key in keys:
if key != "self":
kwargs[key] = values[key]
return kwargs
class Bot:
def __init__(self, token : str, commands : dict[str, Callable[[Bot, dict, dict], str]], fallback=None, setup=None):
def __init__(
self,
token: str,
commands: dict[str, Callable[[Bot, dict, dict], str]],
fallback=None,
setup=None,
):
self.token = token
self.url = "https://api.telegram.org/bot" + self.token + "/"
@ -44,14 +63,12 @@ class Bot:
self.setup()
@_logwrap
def get_updates(self, offset=None):
url = self.url + "getUpdates?timeout=100"
def get_updates(self, offset=None, timeout=100):
url = self.url + f"getUpdates?timeout={timeout}"
if offset:
url += "&offset={}".format(offset)
url += f"&offset={offset}"
json_url = urllib.request.urlopen(url).read().decode()
js = json.loads(json_url)
@ -85,15 +102,40 @@ class Bot:
return js
@_logwrap
def send(self, text: str, chat_id: int):
def send(self, text: str, chat_id: int, parse_mode: str = "Markdown"):
if text != "":
params = get_kwargs()
text = urllib.parse.quote_plus(text)
url = self.url + "sendMessage?text={}&chat_id={}&parse_mode=Markdown".format(text, chat_id)
url = self.url + "sendMessage?" + urllib.parse.urlencode(params)
urllib.request.urlopen(url)
@_logwrap
def forwardMessage(
self,
chat_id: int,
from_chat_id: int,
message_id: int,
disable_notification: bool = False,
protect_content: bool = False,
):
url = self.url + "forwardMessage?"
params = get_kwargs()
query = urllib.parse.urlencode(params)
urllib.request.urlopen(url + query)
#
# @_logwrap
# def sendPhoto(self, chat_id: int, photo: str, caption: str, disable_notification: bool = False, protect_content: bool = False):
# url = self.url + "forwardMessage?"
# params = get_kwargs()
# query = urllib.parse.urlencode(params)
# urllib.request.urlopen(url + query)
#
def process(self, updates):
for update in updates["result"]:
try:
@ -114,15 +156,17 @@ class Bot:
command = re.split(" |@", text)[0]
if command in self.commands:
self.send(self.commands[command](self, text, chat), chat["chat"]["id"])
self.send(
self.commands[command](self, text, chat), chat["chat"]["id"]
)
else:
self.send(self.fallback(self, text, chat), chat["chat"]["id"])
except Exception as e: # we're not using the log wrapper because we care about the update
logging.debug(time.strftime('%l:%M%p %Z'))
except Exception as e: # we're not using the log wrapper because we care about the update
logging.debug(time.strftime("%l:%M%p %Z"))
logging.debug(traceback.format_exc())
logging.debug(e)
logging.debug(str(update) + "\n")
logging.debug(str(update) + "\n")
def poll(self):
log.debug("started polling")
@ -135,6 +179,3 @@ class Bot:
last_update_id = self.get_last_update_id(updates) + 1
self.process(updates)
time.sleep(0.5)