graceful stopping of threads, use logging, reuse of broadcaster

This commit is contained in:
Jannes Höke 2015-11-15 17:36:38 +01:00
parent 04050ca883
commit 12201f392d
2 changed files with 196 additions and 110 deletions

View file

@ -4,10 +4,11 @@
This module contains the class BotEventHandler, which tries to make creating
Telegram Bots intuitive!
"""
import logging
import sys
from threading import Thread
from telegram import (Bot, TelegramError, TelegramObject, Broadcaster)
from telegram import (Bot, TelegramError, broadcaster, Broadcaster,
NullHandler)
from time import sleep
# Adjust for differences in Python versions
@ -16,6 +17,9 @@ if sys.version_info.major is 2:
elif sys.version_info.major is 3:
from queue import Queue
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
class BotEventHandler:
"""
@ -30,14 +34,25 @@ class BotEventHandler:
Args:
token (str): The bots token given by the @BotFather
base_url (Optional[str]):
broadcaster (Optional[Broadcaster]): Use present Broadcaster object. If
None, a new one will be created.
workers (Optional[int]): Amount of threads in the thread pool for
functions decorated with @run_async
"""
def __init__(self, token, base_url=None, workers=4):
def __init__(self, token, base_url=None, broadcaster=None, workers=4):
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.last_update_id = 0
self.broadcaster = Broadcaster(self.bot, self.update_queue,
workers=workers)
if broadcaster:
self.update_queue = broadcaster.update_queue
self.broadcaster = broadcaster
else:
self.update_queue = Queue()
self.broadcaster = Broadcaster(self.bot, self.update_queue,
workers=workers)
self.logger = logging.getLogger(__name__)
self.running = False
def start(self, poll_interval=1.0, timeout=10, network_delay=2):
"""
@ -60,9 +75,7 @@ class BotEventHandler:
args=(poll_interval, timeout,
network_delay))
# Set threads as daemons so they'll stop if the main thread stops
broadcaster_thread.daemon = True
event_handler_thread.daemon = True
self.running = True
# Start threads
broadcaster_thread.start()
@ -79,12 +92,19 @@ class BotEventHandler:
"""
current_interval = poll_interval
self.logger.info('Event Handler thread started')
while True:
while self.running:
try:
updates = self.bot.getUpdates(self.last_update_id,
timeout=timeout,
network_delay=network_delay)
if not self.running:
if len(updates) > 0:
self.logger.info('Updates ignored and will be pulled ' +
'again on restart.')
break
for update in updates:
self.update_queue.put(update)
self.last_update_id = update.update_id + 1
@ -102,3 +122,16 @@ class BotEventHandler:
current_interval += current_interval / 2
if current_interval > 30:
current_interval = 30
self.logger.info('Event Handler thread stopped')
def stop(self):
"""
Stops the polling thread and the broadcaster
"""
self.logger.info('Stopping Event Handler and Broadcaster...')
self.running = False
self.broadcaster.stop()
while broadcaster.running_async > 0:
sleep(1)

View file

@ -3,12 +3,20 @@
"""
This module contains the Broadcaster class.
"""
import logging
from functools import wraps
from inspect import getargspec
from threading import Thread, BoundedSemaphore, Lock
from re import match
from telegram import (TelegramError, TelegramObject, Update)
from threading import Thread, BoundedSemaphore
from telegram import (TelegramError, Update, NullHandler)
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
semaphore = None
running_async = 0
async_lock = Lock()
def run_async(func):
@ -24,14 +32,26 @@ def run_async(func):
@wraps(func)
def pooled(*args, **kwargs):
"""
A wrapper to run a thread in a thread pool
"""
global running_async, async_lock
result = func(*args, **kwargs)
semaphore.release()
with async_lock:
running_async -= 1
return result
@wraps(func)
def async_func(*args, **kwargs):
"""
A wrapper to run a function in a thread
"""
global running_async, async_lock
thread = Thread(target=pooled, args=args, kwargs=kwargs)
semaphore.acquire()
with async_lock:
running_async += 1
thread.start()
return thread
@ -61,39 +81,132 @@ class Broadcaster:
self.unknown_telegram_command_handlers = []
self.unknown_string_command_handlers = []
self.error_handlers = []
self.logger = logging.getLogger(__name__)
self.running = False
global semaphore
if not semaphore:
semaphore = BoundedSemaphore(value=workers)
else:
print("Semaphore already initialized, skipping.")
self.logger.info("Semaphore already initialized, skipping.")
class _Stop:
"""
A class which objects can be passed into the update queue to stop the
thread
"""
pass
def start(self):
"""
Thread target of thread 'broadcaster'. Runs in background and processes
the update queue.
"""
self.running = True
self.logger.info('Broadcaster thread started')
while True:
try:
# Pop update from update queue.
# Blocks if no updates are available.
update = self.update_queue.get()
if type(update) is self._Stop:
break
self.processUpdate(update)
# Broadcast any errors
except TelegramError as te:
self.broadcastError(te)
self.logger.info('Broadcaster thread stopped')
def stop(self):
"""
Stops the thread
"""
if self.running:
self.running = False
self.update_queue.put(self._Stop())
def processUpdate(self, update):
"""
Processes a single update.
Args:
update (any):
"""
handled = False
# Custom type handlers
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastType(update)
handled = True
# string update
if type(update) is str and update.startswith('/'):
self.broadcastStringCommand(update)
handled = True
elif type(update) is str:
self.broadcastStringRegex(update)
handled = True
# An error happened while polling
if isinstance(update, TelegramError):
self.broadcastError(update)
handled = True
# Telegram update (regex)
if isinstance(update, Update):
self.broadcastTelegramRegex(update)
handled = True
# Telegram update (command)
if isinstance(update, Update) \
and update.message.text.startswith('/'):
self.broadcastTelegramCommand(update)
handled = True
# Telegram update (message)
elif isinstance(update, Update):
self.broadcastTelegramMessage(update)
handled = True
# Update not recognized
if not handled:
self.broadcastError(TelegramError(
"Received update of unknown type %s" % type(update)))
# Add Handlers
def addTelegramMessageHandler(self, handler):
"""
Registers a message handler in the Broadcaster.
Args:
handler (function): A function that takes (Bot, Update) as
arguments.
"""
self.telegram_message_handlers.append(handler)
def addTelegramCommandHandler(self, command, handler):
"""
Registers a command handler in the Broadcaster.
Args:
command (str): The command keyword that this handler should be
listening to.
command (str): The command keyword that this handler should be
listening to.
handler (function): A function that takes (Bot, Update) as
arguments.
"""
if command not in self.telegram_command_handlers:
self.telegram_command_handlers[command] = []
self.telegram_command_handlers[command].append(handler)
def addTelegramRegexHandler(self, matcher, handler):
@ -112,17 +225,17 @@ class Broadcaster:
self.telegram_regex_handlers[matcher] = []
self.telegram_regex_handlers[matcher].append(handler)
def addStringCommandHandler(self, command, handler):
"""
Registers a string-command handler in the Broadcaster.
Args:
command (str): The command keyword that this handler should be
listening to.
command (str): The command keyword that this handler should be
listening to.
handler (function): A function that takes (Bot, str) as arguments.
"""
if command not in self.string_command_handlers:
self.string_command_handlers[command] = []
@ -149,41 +262,41 @@ class Broadcaster:
"""
Registers a command handler in the Broadcaster, that will receive all
commands that have no associated handler.
Args:
handler (function): A function that takes (Bot, Update) as
arguments.
"""
self.unknown_telegram_command_handlers.append(handler)
def addUnknownStringCommandHandler(self, handler):
"""
Registers a string-command handler in the Broadcaster, that will receive
Registers a string-command handler in the Broadcaster, that will receive
all commands that have no associated handler.
Args:
handler (function): A function that takes (Bot, str) as arguments.
"""
self.unknown_string_command_handlers.append(handler)
def addErrorHandler(self, handler):
"""
Registers an error handler in the Broadcaster.
Args:
handler (function): A function that takes (Bot, TelegramError) as
arguments.
"""
self.error_handlers.append(handler)
def addTypeHandler(self, the_type, handler):
"""
Registers a type handler in the Broadcaster. This allows you to send
any type of object into the update queue.
Args:
the_type (type): The type this handler should listen to
handler (function): A function that takes (Bot, type) as arguments.
@ -191,25 +304,25 @@ class Broadcaster:
if the_type not in self.type_handlers:
self.type_handlers[the_type] = []
self.type_handlers[the_type].append(handler)
# Remove Handlers
def removeTelegramMessageHandler(self, handler):
"""
De-registers a message handler.
Args:
handler (any):
"""
if handler in self.telegram_message_handlers:
self.telegram_message_handlers.remove(handler)
def removeTelegramCommandHandler(self, command, handler):
"""
De-registers a command handler.
Args:
command (str): The command
handler (any):
@ -293,8 +406,8 @@ class Broadcaster:
def removeTypeHandler(self, the_type, handler):
"""
De-registers a type handler.
De-registers a type handler.
Args:
handler (any):
"""
@ -302,73 +415,6 @@ class Broadcaster:
if the_type in self.type_handlers \
and handler in self.type_handlers[the_type]:
self.type_handlers[the_type].remove(handler)
def start(self):
"""
Thread target of thread 'broadcaster'. Runs in background and processes
the update queue.
"""
while True:
try:
# Pop update from update queue.
# Blocks if no updates are available.
update = self.update_queue.get()
self.processUpdate(update)
# Broadcast any errors
except TelegramError as te:
self.broadcastError(te)
def processUpdate(self, update):
"""
Processes a single update.
Args:
update (any):
"""
handled = False
# Custom type handlers
for t in self.type_handlers:
if isinstance(update, t):
self.broadcastType(update)
handled = True
# string update
if type(update) is str and update.startswith('/'):
self.broadcastStringCommand(update)
handled = True
elif type(update) is str:
self.broadcastStringRegex(update)
handled = True
# An error happened while polling
if isinstance(update, TelegramError):
self.broadcastError(update)
handled = True
# Telegram update (regex)
if isinstance(update, Update):
self.broadcastTelegramRegex(update)
handled = True
# Telegram update (command)
if isinstance(update, Update) \
and update.message.text.startswith('/'):
self.broadcastTelegramCommand(update)
handled = True
# Telegram update (message)
elif isinstance(update, Update):
self.broadcastTelegramMessage(update)
handled = True
# Update not recognized
if not handled:
self.broadcastError(TelegramError(
"Received update of unknown type %s" % type(update)))
def broadcastTelegramCommand(self, update):
"""
@ -400,7 +446,7 @@ class Broadcaster:
matching_handlers = []
for matcher in self.telegram_regex_handlers:
if matcher.match(update.message.text):
if match(matcher, update.message.text):
for handler in self.telegram_regex_handlers[matcher]:
matching_handlers.append(handler)
@ -434,7 +480,7 @@ class Broadcaster:
matching_handlers = []
for matcher in self.string_regex_handlers:
if matcher.match(update):
if match(matcher, update):
for handler in self.string_regex_handlers[matcher]:
matching_handlers.append(handler)
@ -487,4 +533,11 @@ class Broadcaster:
"""
for handler in handlers:
handler(self.bot, update)
self.call_handler(handler, update)
def call_handler(self, handler, update):
kwargs = {}
if 'update_queue' in getargspec(handler).args:
kwargs['update_queue'] = self.update_queue
handler(self.bot, update, **kwargs)