initial commit for BotEventHandler and Broadcaster

This commit is contained in:
Jannes Höke 2015-11-05 13:52:33 +01:00
parent d3d5c1e907
commit 0688691974
2 changed files with 554 additions and 0 deletions

102
telegram/boteventhandler.py Normal file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env python
"""
This module contains the class BotEventHandler, which tries to make creating
Telegram Bots intuitive!
"""
import sys
from threading import Thread
from telegram import (Bot, TelegramError, TelegramObject, Broadcaster)
from time import sleep
# Adjust for differences in Python versions
if sys.version_info.major is 2:
from Queue import Queue
elif sys.version_info.major is 3:
from queue import Queue
class BotEventHandler(TelegramObject):
"""
This class provides a frontend to telegram.Bot to the programmer, so they
can focus on coding the bot. I also runs in a separate thread, so the user
can interact with the bot, for example on the command line. It supports
Handlers for different kinds of data: Updates from Telegram, basic text
commands and even arbitrary types.
Attributes:
Args:
token (str): The bots token given by the @BotFather
**kwargs: Arbitrary keyword arguments.
Keyword Args:
base_url (Optional[str]):
"""
def __init__(self, token, base_url=None):
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.last_update_id = 0
self.broadcaster = Broadcaster(self.bot, self.update_queue)
def start(self, poll_interval=1.0):
"""
Starts polling updates from Telegram.
Args:
**kwargs: Arbitrary keyword arguments.
Keyword Args:
poll_interval (Optional[float]): Time to wait between polling
updates from Telegram in seconds. Default is 1.0
Returns:
Queue: The update queue that can be filled from the main thread
"""
# Create Thread objects
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
event_handler_thread = Thread(target=self.__start, name="eventhandler",
args=(poll_interval,))
# Set threads as daemons so they'll stop if the main thread stops
broadcaster_thread.daemon = True
event_handler_thread.daemon = True
# Start threads
broadcaster_thread.start()
event_handler_thread.start()
# Return the update queue so the main thread can insert updates
return self.update_queue
def __start(self, poll_interval):
"""
Thread target of thread 'eventhandler'. Runs in background, pulls
updates from Telegram and inserts them in the update queue of the
Broadcaster.
"""
current_interval = poll_interval
while True:
try:
updates = self.bot.getUpdates(self.last_update_id)
for update in updates:
self.update_queue.put(update)
self.last_update_id = update.update_id + 1
current_interval = poll_interval
sleep(current_interval)
except TelegramError as te:
# Put the error into the update queue and let the Broadcaster
# broadcast it
self.update_queue.put(te)
sleep(current_interval)
# increase waiting times on subsequent errors up to 30secs
if current_interval < 30:
current_interval += current_interval / 2
if current_interval > 30:
current_interval = 30

452
telegram/broadcaster.py Normal file
View file

@ -0,0 +1,452 @@
#!/usr/bin/env python
"""
This module contains the Broadcaster class.
"""
from telegram import (TelegramError, TelegramObject, Update)
class Broadcaster(TelegramObject):
"""
This class broadcasts all kinds of updates to its registered handlers.
Attributes:
Args:
bot (telegram.Bot): The bot object that should be passed to the handlers
update_queue (queue.Queue): The synchronized queue that will contain the
updates
"""
def __init__(self, bot, update_queue):
self.bot = bot
self.update_queue = update_queue
self.telegram_message_handlers = []
self.telegram_command_handlers = {}
self.telegram_regex_handlers = {}
self.string_regex_handlers = {}
self.string_command_handlers = {}
self.type_handlers = {}
self.unknown_telegram_command_handlers = []
self.unknown_string_command_handlers = []
self.error_handlers = []
# Add Handlers
def addTelegramMessageHandler(self, handler):
"""
Registers a message handler in the Broadcaster.
Args:
handler (function): A function that takes (Bot, Update) as
arguments.
"""
self.telegram_message_handlers.append(handler)
def addTelegramCommandHandler(self, command, handler):
"""
Registers a command handler in the Broadcaster.
Args:
command (str): The command keyword that this handler should be
listening to.
handler (function): A function that takes (Bot, Update) as
arguments.
"""
if command not in self.telegram_command_handlers:
self.telegram_command_handlers[command] = []
self.telegram_command_handlers[command].append(handler)
def addTelegramRegexHandler(self, matcher, handler):
"""
Registers a regex handler in the Broadcaster. If handlers will be called
if matcher.math(update.message.text) is True.
Args:
matcher (str): A compiled regex object that matches on messages that
handler should be listening to
handler (function): A function that takes (Bot, Update) as
arguments.
"""
if matcher not in self.telegram_regex_handlers:
self.telegram_regex_handlers[matcher] = []
self.telegram_regex_handlers[matcher].append(handler)
def addStringCommandHandler(self, command, handler):
"""
Registers a string-command handler in the Broadcaster.
Args:
command (str): The command keyword that this handler should be
listening to.
handler (function): A function that takes (Bot, str) as arguments.
"""
if command not in self.string_command_handlers:
self.string_command_handlers[command] = []
self.string_command_handlers[command].append(handler)
def addStringRegexHandler(self, matcher, handler):
"""
Registers a regex handler in the Broadcaster. If handlers will be called
if matcher.math(string) is True.
Args:
matcher (str): A compiled regex object that matches on the string
input that handler should be listening to
handler (function): A function that takes (Bot, Update) as
arguments.
"""
if matcher not in self.string_regex_handlers:
self.string_regex_handlers[matcher] = []
self.string_regex_handlers[matcher].append(handler)
def addUnknownTelegramCommandHandler(self, handler):
"""
Registers a command handler in the Broadcaster, that will receive all
commands that have no associated handler.
Args:
handler (function): A function that takes (Bot, Update) as
arguments.
"""
self.unknown_telegram_command_handlers.append(handler)
def addUnknownStringCommandHandler(self, handler):
"""
Registers a string-command handler in the Broadcaster, that will receive
all commands that have no associated handler.
Args:
handler (function): A function that takes (Bot, str) as arguments.
"""
self.unknown_string_command_handlers.append(handler)
def addErrorHandler(self, handler):
"""
Registers an error handler in the Broadcaster.
Args:
handler (function): A function that takes (Bot, TelegramError) as
arguments.
"""
self.error_handlers.append(handler)
def addTypeHandler(self, the_type, handler):
"""
Registers a type handler in the Broadcaster. This allows you to send
any type of object into the update queue.
Args:
the_type (type): The type this handler should listen to
handler (function): A function that takes (Bot, type) as arguments.
"""
if the_type not in self.type_handlers:
self.type_handlers[the_type] = []
self.type_handlers[the_type].append(handler)
# Remove Handlers
def removeTelegramMessageHandler(self, handler):
"""
De-registers a message handler.
Args:
handler (any):
"""
if handler in self.telegram_message_handlers:
self.telegram_message_handlers.remove(handler)
def removeTelegramCommandHandler(self, command, handler):
"""
De-registers a command handler.
Args:
command (str): The command
handler (any):
"""
if command in self.telegram_command_handlers \
and handler in self.telegram_command_handlers[command]:
self.telegram_command_handlers[command].remove(handler)
def removeTelegramRegexHandler(self, matcher, handler):
"""
De-registers a regex handler.
Args:
matcher (str): The regex matcher object
handler (any):
"""
if matcher in self.telegram_regex_handlers \
and handler in self.telegram_regex_handlers[matcher]:
self.telegram_regex_handlers[matcher].remove(handler)
def removeStringCommandHandler(self, command, handler):
"""
De-registers a string-command handler.
Args:
command (str): The command
handler (any):
"""
if command in self.string_command_handlers \
and handler in self.string_command_handlers[command]:
self.string_command_handlers[command].remove(handler)
def removeStringRegexHandler(self, matcher, handler):
"""
De-registers a regex handler.
Args:
matcher (str): The regex matcher object
handler (any):
"""
if matcher in self.string_regex_handlers \
and handler in self.string_regex_handlers[matcher]:
self.string_regex_handlers[matcher].remove(handler)
def removeUnknownTelegramCommandHandler(self, handler):
"""
De-registers an unknown-command handler.
Args:
handler (any):
"""
if handler in self.unknown_telegram_command_handlers:
self.unknown_telegram_command_handlers.remove(handler)
def removeUnknownStringCommandHandler(self, handler):
"""
De-registers an unknown-command handler.
Args:
handler (any):
"""
if handler in self.unknown_string_command_handlers:
self.unknown_string_command_handlers.remove(handler)
def removeErrorHandler(self, handler):
"""
De-registers an error handler.
Args:
handler (any):
"""
if handler in self.error_handlers:
self.error_handlers.remove(handler)
def removeTypeHandler(self, the_type, handler):
"""
De-registers a type handler.
Args:
handler (any):
"""
if the_type in self.type_handlers \
and handler in self.type_handlers[the_type]:
self.type_handlers[the_type].remove(handler)
def start(self):
"""
Thread target of thread 'broadcaster'. Runs in background and processes
the update queue.
"""
while True:
try:
# Pop update from update queue.
# Blocks if no updates are available.
update = self.update_queue.get()
self.processUpdate(update)
# Broadcast any errors
except TelegramError as te:
self.broadcastError(te)
def processUpdate(self, update):
"""
Processes a single update.
Args:
update (any):
"""
handled = False
# Custom type handlers
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastType(update)
handled = True
# string update
if type(update) is str and update.startswith('/'):
self.broadcastStringCommand(update)
handled = True
elif type(update) is str:
self.broadcastStringRegex(update)
handled = True
# An error happened while polling
if isinstance(update, TelegramError):
self.broadcastError(update)
handled = True
# Telegram update (regex)
if isinstance(update, Update):
self.broadcastTelegramRegex(update)
handled = True
# Telegram update (command)
if isinstance(update, Update) \
and update.message.text.startswith('/'):
self.broadcastTelegramCommand(update)
handled = True
# Telegram update (message)
elif isinstance(update, Update):
self.broadcastTelegramMessage(update)
handled = True
# Update not recognized
if not handled:
self.broadcastError(TelegramError(
"Received update of unknown type %s" % type(update)))
def broadcastTelegramCommand(self, update):
"""
Broadcasts an update that contains a command.
Args:
command (str): The command keyword
update (telegram.Update): The Telegram update that contains the
command
"""
command = update.message.text.split(' ')[0][1:].split('@')[0]
if command in self.telegram_command_handlers:
self.broadcastTo(self.telegram_command_handlers[command], update)
else:
self.broadcastTo(self.unknown_telegram_command_handlers, update)
def broadcastTelegramRegex(self, update):
"""
Broadcasts an update to all regex handlers that match the message string.
Args:
command (str): The command keyword
update (telegram.Update): The Telegram update that contains the
command
"""
matching_handlers = []
for matcher in self.telegram_regex_handlers:
if matcher.match(update.message.text):
for handler in self.telegram_regex_handlers[matcher]:
matching_handlers.append(handler)
self.broadcastTo(matching_handlers, update)
def broadcastStringCommand(self, update):
"""
Broadcasts a string-update that contains a command.
Args:
update (str): The string input
"""
command = update.split(' ')[0][1:]
if command in self.string_command_handlers:
self.broadcastTo(self.string_command_handlers[command], update)
else:
self.broadcastTo(self.unknown_string_command_handlers, update)
def broadcastStringRegex(self, update):
"""
Broadcasts an update to all string regex handlers that match the string.
Args:
command (str): The command keyword
update (telegram.Update): The Telegram update that contains the
command
"""
matching_handlers = []
for matcher in self.string_regex_handlers:
if matcher.match(update):
for handler in self.string_regex_handlers[matcher]:
matching_handlers.append(handler)
self.broadcastTo(matching_handlers, update)
def broadcastType(self, update):
"""
Broadcasts an update of any type.
Args:
update (any): The update
"""
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastTo(self.type_handlers[t], update)
else:
self.broadcastError(TelegramError(
"Received update of unknown type %s" % type(update)))
def broadcastTelegramMessage(self, update):
"""
Broadcasts an update that contains a regular message.
Args:
update (telegram.Update): The Telegram update that contains the
message.
"""
self.broadcastTo(self.telegram_message_handlers, update)
def broadcastError(self, error):
"""
Broadcasts an error.
Args:
error (telegram.TelegramError): The Telegram error that was raised.
"""
for handler in self.error_handlers:
handler(self.bot, error)
def broadcastTo(self, handlers, update):
"""
Broadcasts an update to a list of handlers.
Args:
handlers (list): A list of handler-functions.
update (any): The update to be broadcasted
"""
for handler in handlers:
handler(self.bot, update)