refactor Broadcaster to Dispatcher

This commit is contained in:
Jannes Höke 2015-11-22 14:47:38 +01:00
parent 88fbf3b5cf
commit 1782d0d19b
6 changed files with 105 additions and 105 deletions

View file

@ -4,7 +4,7 @@
This Bot uses the BotEventHandler class to handle the bot.
First, a few handler functions are defined. Then, those functions are passed to
the Broadcaster and registered at their respective places.
the Dispatcher and registered at their respective places.
Then, the bot is started and the CLI-Loop is entered, where all text inputs are
inserted into the update queue for the bot to handle.
@ -100,34 +100,34 @@ def main():
# Create the EventHandler and pass it your bot's token.
eh = BotEventHandler("TOKEN", workers=2)
# Get the broadcaster to register handlers
bc = eh.broadcaster
# Get the dispatcher to register handlers
dp = eh.dispatcher
# on different commands - answer in Telegram
bc.addTelegramCommandHandler("start", startCommandHandler)
bc.addTelegramCommandHandler("help", helpCommandHandler)
dp.addTelegramCommandHandler("start", startCommandHandler)
dp.addTelegramCommandHandler("help", helpCommandHandler)
# on regex match - print all messages to stdout
bc.addTelegramRegexHandler('.*', anyMessageHandler)
dp.addTelegramRegexHandler('.*', anyMessageHandler)
# on CLI commands - type "/reply text" in terminal to reply to the last
# active chat
bc.addStringCommandHandler('reply', CLIReplyCommandHandler)
dp.addStringCommandHandler('reply', CLIReplyCommandHandler)
# on unknown commands - answer on Telegram
bc.addUnknownTelegramCommandHandler(unknownCommandHandler)
dp.addUnknownTelegramCommandHandler(unknownCommandHandler)
# on unknown CLI commands - notify the user
bc.addUnknownStringCommandHandler(unknownCLICommandHandler)
dp.addUnknownStringCommandHandler(unknownCLICommandHandler)
# on any CLI message that is not a command - resend it as a command
bc.addStringRegexHandler('[^/].*', anyCLIHandler)
dp.addStringRegexHandler('[^/].*', anyCLIHandler)
# on noncommand i.e message - echo the message on Telegram
bc.addTelegramMessageHandler(messageHandler)
dp.addTelegramMessageHandler(messageHandler)
# on error - print error to stdout
bc.addErrorHandler(errorHandler)
dp.addErrorHandler(errorHandler)
# Start the Bot and store the update Queue,
# so we can insert updates ourselves
@ -137,7 +137,7 @@ def main():
update_queue = eh.start_webhook('example.com', 443, 'cert.pem', 'key.key',
listen='0.0.0.0')
'''
# Start CLI-Loop
while True:
try:

View file

@ -4,7 +4,7 @@
This Bot uses the BotEventHandler class to handle the bot.
First, a few handler functions are defined. Then, those functions are passed to
the Broadcaster and registered at their respective places.
the Dispatcher and registered at their respective places.
Then, the bot is started and the CLI-Loop is entered.
Usage:
@ -48,18 +48,18 @@ def main():
# Create the EventHandler and pass it your bot's token.
eh = BotEventHandler("TOKEN")
# Get the broadcaster to register handlers
bc = eh.broadcaster
# Get the dispatcher to register handlers
dp = eh.dispatcher
# on different commands - answer in Telegram
bc.addTelegramCommandHandler("start", start)
bc.addTelegramCommandHandler("help", help)
dp.addTelegramCommandHandler("start", start)
dp.addTelegramCommandHandler("help", help)
# on noncommand i.e message - echo the message on Telegram
bc.addTelegramMessageHandler(echo)
dp.addTelegramMessageHandler(echo)
# on error - print error to stdout
bc.addErrorHandler(error)
dp.addErrorHandler(error)
# Start the Bot
eh.start_polling()

View file

@ -47,10 +47,10 @@ from .parsemode import ParseMode
from .message import Message
from .update import Update
from .bot import Bot
from .broadcaster import Broadcaster
from .broadcaster import Dispatcher
from .boteventhandler import BotEventHandler
__all__ = ['Bot', 'BotEventHandler', 'Broadcaster', 'Emoji', 'TelegramError',
__all__ = ['Bot', 'BotEventHandler', 'Dispatcher', 'Emoji', 'TelegramError',
'InputFile', 'ReplyMarkup', 'ForceReply', 'ReplyKeyboardHide',
'ReplyKeyboardMarkup', 'UserProfilePhotos', 'ChatAction',
'Location', 'Contact', 'Video', 'Sticker', 'Document', 'File',

View file

@ -12,7 +12,7 @@ from time import sleep
import subprocess
from telegram import (Bot, TelegramError, broadcaster, Broadcaster,
from telegram import (Bot, TelegramError, broadcaster, Dispatcher,
NullHandler)
from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler)
@ -49,8 +49,8 @@ class BotEventHandler:
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.broadcaster = Broadcaster(self.bot, self.update_queue,
workers=workers)
self.dispatcher = Dispatcher(self.bot, self.update_queue,
workers=workers)
self.last_update_id = 0
self.logger = logging.getLogger(__name__)
self.running = False
@ -71,8 +71,8 @@ class BotEventHandler:
"""
# Create Thread objects
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
event_handler_thread = Thread(target=self._start_polling,
name="eventhandler",
args=(poll_interval, timeout,
@ -81,7 +81,7 @@ class BotEventHandler:
self.running = True
# Start threads
broadcaster_thread.start()
dispatcher_thread.start()
event_handler_thread.start()
# Return the update queue so the main thread can insert updates
@ -103,8 +103,8 @@ class BotEventHandler:
"""
# Create Thread objects
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
event_handler_thread = Thread(target=self._start_webhook,
name="eventhandler",
args=(host, port, cert, key, listen))
@ -112,7 +112,7 @@ class BotEventHandler:
self.running = True
# Start threads
broadcaster_thread.start()
dispatcher_thread.start()
event_handler_thread.start()
# Return the update queue so the main thread can insert updates
@ -122,7 +122,7 @@ class BotEventHandler:
"""
Thread target of thread 'eventhandler'. Runs in background, pulls
updates from Telegram and inserts them in the update queue of the
Broadcaster.
Dispatcher.
"""
current_interval = poll_interval
@ -149,7 +149,7 @@ class BotEventHandler:
sleep(current_interval)
except TelegramError as te:
# Put the error into the update queue and let the Broadcaster
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(te)
sleep(current_interval)
@ -200,9 +200,9 @@ class BotEventHandler:
def stop(self):
"""
Stops the polling thread and the broadcaster
Stops the polling thread and the dispatcher
"""
self.logger.info('Stopping Event Handler and Broadcaster...')
self.logger.info('Stopping Event Handler and Dispatcher...')
self.running = False
if self.httpd:
@ -212,9 +212,9 @@ class BotEventHandler:
self.httpd.shutdown()
self.httpd = None
self.logger.debug("Requesting Broadcaster to stop...")
self.broadcaster.stop()
self.logger.debug("Requesting Dispatcher to stop...")
self.dispatcher.stop()
while broadcaster.running_async > 0:
sleep(1)
self.logger.debug("Broadcaster stopped.")
self.logger.debug("Dispatcher stopped.")

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python
"""
This module contains the Broadcaster class.
This module contains the Dispatcher class.
"""
import logging
from functools import wraps
@ -58,9 +58,9 @@ def run_async(func):
return async_func
class Broadcaster:
class Dispatcher:
"""
This class broadcasts all kinds of updates to its registered handlers.
This class dispatches all kinds of updates to its registered handlers.
A handler is a function that usually takes the following parameters:
bot: The telegram.Bot instance that received the message
@ -74,7 +74,7 @@ class Broadcaster:
appending one or more of the following arguments in their argument list for
convenience:
update_queue: The Queue instance which contains all new updates and is
processed by the Broadcaster. Be careful with this - you might
processed by the Dispatcher. Be careful with this - you might
create an infinite loop.
args: If the update is an instance str or telegram.Update, this will be
a list that contains the content of the message split on spaces,
@ -119,12 +119,12 @@ class Broadcaster:
def start(self):
"""
Thread target of thread 'broadcaster'. Runs in background and processes
Thread target of thread 'dispatcher'. Runs in background and processes
the update queue.
"""
self.running = True
self.logger.info('Broadcaster thread started')
self.logger.info('Dispatcher thread started')
while True:
update = None
@ -139,11 +139,11 @@ class Broadcaster:
self.processUpdate(update)
# Broadcast any errors
# Dispatch any errors
except TelegramError as te:
self.broadcastError(update, te)
self.dispatchError(update, te)
self.logger.info('Broadcaster thread stopped')
self.logger.info('Dispatcher thread stopped')
def stop(self):
"""
@ -166,47 +166,47 @@ class Broadcaster:
# Custom type handlers
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastType(update)
self.dispatchType(update)
handled = True
# string update
if type(update) is str and update.startswith('/'):
self.broadcastStringCommand(update)
self.dispatchStringCommand(update)
handled = True
elif type(update) is str:
self.broadcastStringRegex(update)
self.dispatchStringRegex(update)
handled = True
# An error happened while polling
if isinstance(update, TelegramError):
self.broadcastError(None, update)
self.dispatchError(None, update)
handled = True
# Telegram update (regex)
if isinstance(update, Update):
self.broadcastTelegramRegex(update)
self.dispatchTelegramRegex(update)
handled = True
# Telegram update (command)
if isinstance(update, Update) \
and update.message.text.startswith('/'):
self.broadcastTelegramCommand(update)
self.dispatchTelegramCommand(update)
handled = True
# Telegram update (message)
elif isinstance(update, Update):
self.broadcastTelegramMessage(update)
self.dispatchTelegramMessage(update)
handled = True
# Update not recognized
if not handled:
self.broadcastError(update, TelegramError(
self.dispatchError(update, TelegramError(
"Received update of unknown type %s" % type(update)))
# Add Handlers
def addTelegramMessageHandler(self, handler):
"""
Registers a message handler in the Broadcaster.
Registers a message handler in the Dispatcher.
Args:
handler (function): A function that takes (Bot, Update, *args) as
@ -217,7 +217,7 @@ class Broadcaster:
def addTelegramCommandHandler(self, command, handler):
"""
Registers a command handler in the Broadcaster.
Registers a command handler in the Dispatcher.
Args:
command (str): The command keyword that this handler should be
@ -233,7 +233,7 @@ class Broadcaster:
def addTelegramRegexHandler(self, matcher, handler):
"""
Registers a regex handler in the Broadcaster. If handlers will be
Registers a regex handler in the Dispatcher. If handlers will be
called if re.match(matcher, update.message.text) is True.
Args:
@ -250,7 +250,7 @@ class Broadcaster:
def addStringCommandHandler(self, command, handler):
"""
Registers a string-command handler in the Broadcaster.
Registers a string-command handler in the Dispatcher.
Args:
command (str): The command keyword that this handler should be
@ -266,7 +266,7 @@ class Broadcaster:
def addStringRegexHandler(self, matcher, handler):
"""
Registers a regex handler in the Broadcaster. If handlers will be
Registers a regex handler in the Dispatcher. If handlers will be
called if re.match(matcher, string) is True.
Args:
@ -283,7 +283,7 @@ class Broadcaster:
def addUnknownTelegramCommandHandler(self, handler):
"""
Registers a command handler in the Broadcaster, that will receive all
Registers a command handler in the Dispatcher, that will receive all
commands that have no associated handler.
Args:
@ -295,7 +295,7 @@ class Broadcaster:
def addUnknownStringCommandHandler(self, handler):
"""
Registers a string-command handler in the Broadcaster, that will
Registers a string-command handler in the Dispatcher, that will
receive all commands that have no associated handler.
Args:
@ -307,7 +307,7 @@ class Broadcaster:
def addErrorHandler(self, handler):
"""
Registers an error handler in the Broadcaster.
Registers an error handler in the Dispatcher.
Args:
handler (function): A function that takes (Bot, TelegramError) as
@ -318,7 +318,7 @@ class Broadcaster:
def addTypeHandler(self, the_type, handler):
"""
Registers a type handler in the Broadcaster. This allows you to send
Registers a type handler in the Dispatcher. This allows you to send
any type of object into the update queue.
Args:
@ -441,9 +441,9 @@ class Broadcaster:
and handler in self.type_handlers[the_type]:
self.type_handlers[the_type].remove(handler)
def broadcastTelegramCommand(self, update):
def dispatchTelegramCommand(self, update):
"""
Broadcasts an update that contains a command.
Dispatches an update that contains a command.
Args:
command (str): The command keyword
@ -454,13 +454,13 @@ class Broadcaster:
command = update.message.text.split(' ')[0][1:].split('@')[0]
if command in self.telegram_command_handlers:
self.broadcastTo(self.telegram_command_handlers[command], update)
self.dispatchTo(self.telegram_command_handlers[command], update)
else:
self.broadcastTo(self.unknown_telegram_command_handlers, update)
self.dispatchTo(self.unknown_telegram_command_handlers, update)
def broadcastTelegramRegex(self, update):
def dispatchTelegramRegex(self, update):
"""
Broadcasts an update to all regex handlers that match the message
Dispatches an update to all regex handlers that match the message
string.
Args:
@ -476,11 +476,11 @@ class Broadcaster:
for handler in self.telegram_regex_handlers[matcher]:
matching_handlers.append(handler)
self.broadcastTo(matching_handlers, update)
self.dispatchTo(matching_handlers, update)
def broadcastStringCommand(self, update):
def dispatchStringCommand(self, update):
"""
Broadcasts a string-update that contains a command.
Dispatches a string-update that contains a command.
Args:
update (str): The string input
@ -489,13 +489,13 @@ class Broadcaster:
command = update.split(' ')[0][1:]
if command in self.string_command_handlers:
self.broadcastTo(self.string_command_handlers[command], update)
self.dispatchTo(self.string_command_handlers[command], update)
else:
self.broadcastTo(self.unknown_string_command_handlers, update)
self.dispatchTo(self.unknown_string_command_handlers, update)
def broadcastStringRegex(self, update):
def dispatchStringRegex(self, update):
"""
Broadcasts an update to all string regex handlers that match the
Dispatches an update to all string regex handlers that match the
string.
Args:
@ -511,11 +511,11 @@ class Broadcaster:
for handler in self.string_regex_handlers[matcher]:
matching_handlers.append(handler)
self.broadcastTo(matching_handlers, update)
self.dispatchTo(matching_handlers, update)
def broadcastType(self, update):
def dispatchType(self, update):
"""
Broadcasts an update of any type.
Dispatches an update of any type.
Args:
update (any): The update
@ -523,25 +523,25 @@ class Broadcaster:
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastTo(self.type_handlers[t], update)
self.dispatchTo(self.type_handlers[t], update)
else:
self.broadcastError(update, TelegramError(
self.dispatchError(update, TelegramError(
"Received update of unknown type %s" % type(update)))
def broadcastTelegramMessage(self, update):
def dispatchTelegramMessage(self, update):
"""
Broadcasts an update that contains a regular message.
Dispatches an update that contains a regular message.
Args:
update (telegram.Update): The Telegram update that contains the
message.
"""
self.broadcastTo(self.telegram_message_handlers, update)
self.dispatchTo(self.telegram_message_handlers, update)
def broadcastError(self, update, error):
def dispatchError(self, update, error):
"""
Broadcasts an error.
Dispatches an error.
Args:
update (any): The pdate that caused the error
@ -551,13 +551,13 @@ class Broadcaster:
for handler in self.error_handlers:
handler(self.bot, update, error)
def broadcastTo(self, handlers, update):
def dispatchTo(self, handlers, update):
"""
Broadcasts an update to a list of handlers.
Dispatches an update to a list of handlers.
Args:
handlers (list): A list of handler-functions.
update (any): The update to be broadcasted
update (any): The update to be dispatched
"""
for handler in handlers:

View file

@ -93,7 +93,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addTelegramMessageHandler(self):
print('Testing addTelegramMessageHandler')
self.beh.bot = MockBot('Test')
self.beh.broadcaster.addTelegramMessageHandler(
self.beh.dispatcher.addTelegramMessageHandler(
self.telegramHandlerTest)
self.beh.start_polling(0.05)
sleep(.1)
@ -102,7 +102,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addTelegramMessageHandlerMultipleMessages(self):
print('Testing addTelegramMessageHandler and send 100 messages...')
self.beh.bot = MockBot('Multiple', 100)
self.beh.broadcaster.addTelegramMessageHandler(
self.beh.dispatcher.addTelegramMessageHandler(
self.telegramHandlerTest)
self.beh.start_polling(0.0)
sleep(.5)
@ -112,7 +112,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addTelegramRegexHandler(self):
print('Testing addStringRegexHandler')
self.beh.bot = MockBot('Test2')
self.beh.broadcaster.addTelegramRegexHandler(re.compile('Te.*'),
self.beh.dispatcher.addTelegramRegexHandler(re.compile('Te.*'),
self.telegramHandlerTest)
self.beh.start_polling(0.05)
sleep(.1)
@ -121,7 +121,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addTelegramCommandHandler(self):
print('Testing addTelegramCommandHandler')
self.beh.bot = MockBot('/test')
self.beh.broadcaster.addTelegramCommandHandler(
self.beh.dispatcher.addTelegramCommandHandler(
'test', self.telegramHandlerTest)
self.beh.start_polling(0.05)
sleep(.1)
@ -130,7 +130,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addUnknownTelegramCommandHandler(self):
print('Testing addUnknownTelegramCommandHandler')
self.beh.bot = MockBot('/test2')
self.beh.broadcaster.addUnknownTelegramCommandHandler(
self.beh.dispatcher.addUnknownTelegramCommandHandler(
self.telegramHandlerTest)
self.beh.start_polling(0.05)
sleep(.1)
@ -139,7 +139,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addStringRegexHandler(self):
print('Testing addStringRegexHandler')
self.beh.bot = MockBot('')
self.beh.broadcaster.addStringRegexHandler(re.compile('Te.*'),
self.beh.dispatcher.addStringRegexHandler(re.compile('Te.*'),
self.stringHandlerTest)
queue = self.beh.start_polling(0.05)
queue.put('Test3')
@ -149,7 +149,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addStringCommandHandler(self):
print('Testing addStringCommandHandler')
self.beh.bot = MockBot('')
self.beh.broadcaster.addStringCommandHandler(
self.beh.dispatcher.addStringCommandHandler(
'test3', self.stringHandlerTest)
queue = self.beh.start_polling(0.05)
@ -160,7 +160,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addUnknownStringCommandHandler(self):
print('Testing addUnknownStringCommandHandler')
self.beh.bot = MockBot('/test')
self.beh.broadcaster.addUnknownStringCommandHandler(
self.beh.dispatcher.addUnknownStringCommandHandler(
self.stringHandlerTest)
queue = self.beh.start_polling(0.05)
queue.put('/test4')
@ -170,7 +170,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addErrorHandler(self):
print('Testing addErrorHandler')
self.beh.bot = MockBot('')
self.beh.broadcaster.addErrorHandler(self.errorHandlerTest)
self.beh.dispatcher.addErrorHandler(self.errorHandlerTest)
queue = self.beh.start_polling(0.05)
error = TelegramError("Unauthorized.")
queue.put(error)
@ -180,7 +180,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_errorOnGetUpdates(self):
print('Testing errorOnGetUpdates')
self.beh.bot = MockBot('', raise_error=True)
self.beh.broadcaster.addErrorHandler(self.errorHandlerTest)
self.beh.dispatcher.addErrorHandler(self.errorHandlerTest)
self.beh.start_polling(0.05)
sleep(.1)
self.assertEqual(self.received_message.message, "Test Error")
@ -188,7 +188,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_addTypeHandler(self):
print('Testing addTypeHandler')
self.beh.bot = MockBot('')
self.beh.broadcaster.addTypeHandler(dict, self.stringHandlerTest)
self.beh.dispatcher.addTypeHandler(dict, self.stringHandlerTest)
queue = self.beh.start_polling(0.05)
payload = {"Test": 42}
queue.put(payload)
@ -198,7 +198,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_runAsync(self):
print('Testing @run_async')
self.beh.bot = MockBot('Test4', messages=2)
self.beh.broadcaster.addTelegramMessageHandler(
self.beh.dispatcher.addTelegramMessageHandler(
self.asyncHandlerTest)
self.beh.start_polling(0.01)
sleep(1.2)
@ -208,7 +208,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_additionalArgs(self):
print('Testing additional arguments for handlers')
self.beh.bot = MockBot('')
self.beh.broadcaster.addStringCommandHandler(
self.beh.dispatcher.addStringCommandHandler(
'test5', self.additionalArgsTest)
queue = self.beh.start_polling(0.05)
@ -220,7 +220,7 @@ class BotEventHandlerTest(BaseTest, unittest.TestCase):
def test_webhook(self):
print('Testing Webhook')
self.beh.bot = MockBot('Test4', messages=2)
self.beh.broadcaster.addTelegramMessageHandler(
self.beh.dispatcher.addTelegramMessageHandler(
self.telegramHandlerTest)
# Select random port for travis