Merge pull request #307 from python-telegram-bot/jobqueue-rework

Make job queue API similar to the dispatcher, add new functionality
This commit is contained in:
Jannes Höke 2016-06-29 16:20:43 +02:00 committed by GitHub
commit 9fd298a393
17 changed files with 473 additions and 201 deletions

View file

@ -17,15 +17,15 @@ Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
from telegram.ext import Updater, CommandHandler
from telegram.ext import Updater, CommandHandler, Job
import logging
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
level=logging.DEBUG)
logger = logging.getLogger(__name__)
job_queue = None
timers = dict()
# Define a few command handlers. These usually take the two arguments bot and
@ -34,8 +34,8 @@ def start(bot, update):
bot.sendMessage(update.message.chat_id, text='Hi! Use /set <seconds> to ' 'set a timer')
def set(bot, update, args):
""" Adds a job to the queue """
def set(bot, update, args, job_queue):
"""Adds a job to the queue"""
chat_id = update.message.chat_id
try:
# args[0] should contain the time for the timer in seconds
@ -43,29 +43,40 @@ def set(bot, update, args):
if due < 0:
bot.sendMessage(chat_id, text='Sorry we can not go back to future!')
def alarm(bot):
""" Inner function to send the alarm message """
def alarm(bot, job):
"""Inner function to send the alarm message"""
bot.sendMessage(chat_id, text='Beep!')
# Add job to queue
job_queue.put(alarm, due, repeat=False)
job = Job(alarm, due, repeat=False)
timers[chat_id] = job
job_queue.put(job)
bot.sendMessage(chat_id, text='Timer successfully set!')
except IndexError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
except ValueError:
except (IndexError, ValueError):
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
def unset(bot, update):
"""Removes the job if the user changed their mind"""
chat_id = update.message.chat_id
if chat_id not in timers:
bot.sendMessage(chat_id, text='You have no active timer')
return
job = timers[chat_id]
job.schedule_removal()
bot.sendMessage(chat_id, text='Timer successfully unset!')
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def main():
global job_queue
updater = Updater("TOKEN")
job_queue = updater.job_queue
# Get the dispatcher to register handlers
dp = updater.dispatcher
@ -73,7 +84,8 @@ def main():
# on different commands - answer in Telegram
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", start))
dp.add_handler(CommandHandler("set", set, pass_args=True))
dp.add_handler(CommandHandler("set", set, pass_args=True, pass_job_queue=True))
dp.add_handler(CommandHandler("unset", unset))
# log all errors
dp.add_error_handler(error)

View file

@ -19,7 +19,7 @@
"""Extensions over the Telegram Bot API to facilitate bot making"""
from .dispatcher import Dispatcher
from .jobqueue import JobQueue
from .jobqueue import JobQueue, Job
from .updater import Updater
from .callbackqueryhandler import CallbackQueryHandler
from .choseninlineresulthandler import ChosenInlineResultHandler
@ -32,7 +32,7 @@ from .stringcommandhandler import StringCommandHandler
from .stringregexhandler import StringRegexHandler
from .typehandler import TypeHandler
__all__ = ('Dispatcher', 'JobQueue', 'Updater', 'CallbackQueryHandler',
__all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler',
'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler',
'MessageHandler', 'Filters', 'RegexHandler', 'StringCommandHandler',
'StringRegexHandler', 'TypeHandler')

View file

@ -31,13 +31,20 @@ class CallbackQueryHandler(Handler):
callback (function): A function that takes ``bot, update`` as
positional arguments. It will be called when the ``check_update``
has determined that an update should be processed by this handler.
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, callback, pass_update_queue=False):
super(CallbackQueryHandler, self).__init__(callback, pass_update_queue)
def __init__(self, callback, pass_update_queue=False, pass_job_queue=False):
super(CallbackQueryHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
def check_update(self, update):
return isinstance(update, Update) and update.callback_query

View file

@ -32,13 +32,20 @@ class ChosenInlineResultHandler(Handler):
callback (function): A function that takes ``bot, update`` as
positional arguments. It will be called when the ``check_update``
has determined that an update should be processed by this handler.
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, callback, pass_update_queue=False):
super(ChosenInlineResultHandler, self).__init__(callback, pass_update_queue)
def __init__(self, callback, pass_update_queue=False, pass_job_queue=False):
super(ChosenInlineResultHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
def check_update(self, update):
return isinstance(update, Update) and update.chosen_inline_result

View file

@ -40,9 +40,14 @@ class CommandHandler(Handler):
arguments passed to the command as a keyword argument called `
``args``. It will contain a list of strings, which is the text
following the command split on spaces. Default is ``False``
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self,
@ -50,8 +55,11 @@ class CommandHandler(Handler):
callback,
allow_edited=False,
pass_args=False,
pass_update_queue=False):
super(CommandHandler, self).__init__(callback, pass_update_queue)
pass_update_queue=False,
pass_job_queue=False):
super(CommandHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
self.command = command
self.allow_edited = allow_edited
self.pass_args = pass_args

View file

@ -94,11 +94,16 @@ class Dispatcher(object):
handlers
update_queue (Queue): The synchronized queue that will contain the
updates.
job_queue (Optional[telegram.ext.JobQueue]): The ``JobQueue`` instance to pass onto handler
callbacks
workers (Optional[int]): Number of maximum concurrent worker threads for the ``@run_async``
decorator
"""
def __init__(self, bot, update_queue, workers=4, exception_event=None):
def __init__(self, bot, update_queue, workers=4, exception_event=None, job_queue=None):
self.bot = bot
self.update_queue = update_queue
self.job_queue = job_queue
self.handlers = {}
""":type: dict[int, list[Handler]"""

View file

@ -31,14 +31,20 @@ class Handler(object):
callback (function): A function that takes ``bot, update`` as
positional arguments. It will be called when the ``check_update``
has determined that an update should be processed by this handler.
pass_update_queue (optional[bool]): If the callback should be passed
the update queue as a keyword argument called ``update_queue``. It
can be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, callback, pass_update_queue=False):
def __init__(self, callback, pass_update_queue=False, pass_job_queue=False):
self.callback = callback
self.pass_update_queue = pass_update_queue
self.pass_job_queue = pass_job_queue
def check_update(self, update):
"""
@ -77,6 +83,8 @@ class Handler(object):
optional_args = dict()
if self.pass_update_queue:
optional_args['update_queue'] = dispatcher.update_queue
if self.pass_job_queue:
optional_args['job_queue'] = dispatcher.job_queue
return optional_args

View file

@ -31,13 +31,20 @@ class InlineQueryHandler(Handler):
callback (function): A function that takes ``bot, update`` as
positional arguments. It will be called when the ``check_update``
has determined that an update should be processed by this handler.
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, callback, pass_update_queue=False):
super(InlineQueryHandler, self).__init__(callback, pass_update_queue)
def __init__(self, callback, pass_update_queue=False, pass_job_queue=False):
super(InlineQueryHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
def check_update(self, update):
return isinstance(update, Update) and update.inline_query

View file

@ -16,139 +16,240 @@
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the class JobQueue."""
"""This module contains the classes JobQueue and Job."""
import logging
import time
from threading import Thread, Lock
from queue import PriorityQueue
from threading import Thread, Lock, Event
from queue import PriorityQueue, Empty
class JobQueue(object):
"""
This class allows you to periodically perform tasks with the bot.
"""This class allows you to periodically perform tasks with the bot.
Attributes:
tick_interval (float):
queue (PriorityQueue):
bot (Bot):
running (bool):
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
automatically. Defaults to ``False``
Args:
bot (Bot): The bot instance that should be passed to the jobs
tick_interval (Optional[float]): The interval this queue should check
the newest task in seconds. Defaults to 1.0
"""
def __init__(self, bot, tick_interval=1.0):
self.tick_interval = tick_interval
def __init__(self, bot, prevent_autostart=False):
self.queue = PriorityQueue()
self.bot = bot
self.logger = logging.getLogger(__name__)
self.__lock = Lock()
self.running = False
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__tick = Event()
self.__thread = None
""":type: Thread"""
self._next_peek = None
""":type: float"""
self._running = False
def put(self, run, interval, repeat=True, next_t=None, prevent_autostart=False):
"""
Queue a new job. If the JobQueue is not running, it will be started.
if not prevent_autostart:
self.logger.debug('Auto-starting %s', self.__class__.__name__)
self.start()
def put(self, job, next_t=None):
"""Queue a new job. If the JobQueue is not running, it will be started.
Args:
run (function): A function that takes the parameter `bot`
interval (float): The interval in seconds in which `run` should be
executed
repeat (Optional[bool]): If `False`, job will only be executed once
next_t (Optional[float]): Time in seconds in which run should be
executed first. Defaults to `interval`
prevent_autostart (Optional[bool]): If `True`, the job queue will
not be started automatically if it is not running.
"""
name = run.__name__
job (Job): The ``Job`` instance representing the new job
next_t (Optional[float]): Time in seconds in which the job should be executed first.
Defaults to ``job.interval``
job = JobQueue.Job()
job.run = run
job.interval = interval
job.name = name
job.repeat = repeat
"""
job.job_queue = self
if next_t is None:
next_t = interval
next_t = job.interval
next_t += time.time()
now = time.time()
next_t += now
self.logger.debug('Putting a %s with t=%f' % (job.name, next_t))
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
self.queue.put((next_t, job))
if not self.running and not prevent_autostart:
self.logger.debug('Auto-starting JobQueue')
self.start()
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
def _set_next_peek(self, t):
"""
Set next peek if not defined or `t` is before next peek.
In case the next peek was set, also trigger the `self.__tick` event.
"""
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
self._next_peek = t
self.__tick.set()
def tick(self):
"""
Run all jobs that are due and re-enqueue them with their interval
Run all jobs that are due and re-enqueue them with their interval.
"""
now = time.time()
self.logger.debug('Ticking jobs with t=%f' % now)
while not self.queue.empty():
t, j = self.queue.queue[0]
self.logger.debug('Peeked at %s with t=%f' % (j.name, t))
self.logger.debug('Ticking jobs with t=%f', now)
if t < now:
self.queue.get()
self.logger.debug('Running job %s' % j.name)
try:
j.run(self.bot)
except:
self.logger.exception('An uncaught error was raised while '
'executing job %s' % j.name)
if j.repeat:
self.put(j.run, j.interval)
while True:
try:
t, job = self.queue.get(False)
except Empty:
break
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# we can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already processed
# the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._set_next_peek(t)
break
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
continue
self.logger.debug('Next task isn\'t due yet. Finished!')
break
if job.enabled:
self.logger.debug('Running job %s', job.name)
try:
job.run(self.bot)
except:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self.put(job)
def start(self):
"""
Starts the job_queue thread.
"""
self.__lock.acquire()
if not self.running:
self.running = True
self.__lock.release()
job_queue_thread = Thread(target=self._start, name="job_queue")
job_queue_thread.start()
self.logger.debug('Job Queue thread started')
self.__start_lock.acquire()
if not self._running:
self._running = True
self.__start_lock.release()
self.__thread = Thread(target=self._main_loop, name="job_queue")
self.__thread.start()
self.logger.debug('%s thread started', self.__class__.__name__)
else:
self.__lock.release()
self.__start_lock.release()
def _start(self):
def _main_loop(self):
"""
Thread target of thread 'job_queue'. Runs in background and performs
ticks on the job queue.
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
queue.
"""
while self.running:
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek and self._next_peek - time.time()
self._next_peek = None
self.__tick.clear()
self.__tick.wait(tmout)
# If we were woken up by self.stop(), just bail out
if not self._running:
break
self.tick()
time.sleep(self.tick_interval)
self.logger.debug('Job Queue thread stopped')
self.logger.debug('%s thread stopped', self.__class__.__name__)
def stop(self):
"""
Stops the thread
"""
with self.__lock:
self.running = False
with self.__start_lock:
self._running = False
class Job(object):
""" Inner class that represents a job """
interval = None
name = None
repeat = None
self.__tick.set()
if self.__thread is not None:
self.__thread.join()
def run(self):
pass
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``"""
return tuple(job[1] for job in self.queue.queue if job)
def __lt__(self, other):
return False
class Job(object):
"""This class encapsulates a Job
Attributes:
callback (function):
interval (float):
repeat (bool):
name (str):
enabled (bool): Boolean property that decides if this job is currently active
Args:
callback (function): The callback function that should be executed by the Job. It should
take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It
can be used to terminate the job or modify its interval.
interval (float): The interval in which this job should execute its callback function in
seconds.
repeat (Optional[bool]): If this job should be periodically execute its callback function
(``True``) or only once (``False``). Defaults to ``True``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
"""
job_queue = None
def __init__(self, callback, interval, repeat=True, context=None):
self.callback = callback
self.interval = interval
self.repeat = repeat
self.context = context
self.name = callback.__name__
self._remove = Event()
self._enabled = Event()
self._enabled.set()
def run(self, bot):
"""Executes the callback function"""
self.callback(bot, self)
def schedule_removal(self):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
"""
self._remove.set()
def is_enabled(self):
return self._enabled.is_set()
def set_enabled(self, status):
if status:
self._enabled.set()
else:
self._enabled.clear()
enabled = property(is_enabled, set_enabled)
def __lt__(self, other):
return False

View file

@ -105,8 +105,15 @@ class MessageHandler(Handler):
be used to insert updates. Default is ``False``
"""
def __init__(self, filters, callback, allow_edited=False, pass_update_queue=False):
super(MessageHandler, self).__init__(callback, pass_update_queue)
def __init__(self,
filters,
callback,
allow_edited=False,
pass_update_queue=False,
pass_job_queue=False):
super(MessageHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
self.filters = filters
self.allow_edited = allow_edited

View file

@ -45,9 +45,14 @@ class RegexHandler(Handler):
pass_groupdict (optional[bool]): If the callback should be passed the
result of ``re.match(pattern, text).groupdict()`` as a keyword
argument called ``groupdict``. Default is ``False``
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self,
@ -55,8 +60,11 @@ class RegexHandler(Handler):
callback,
pass_groups=False,
pass_groupdict=False,
pass_update_queue=False):
super(RegexHandler, self).__init__(callback, pass_update_queue)
pass_update_queue=False,
pass_job_queue=False):
super(RegexHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
if isinstance(pattern, string_types):
pattern = re.compile(pattern)

View file

@ -36,13 +36,25 @@ class StringCommandHandler(Handler):
arguments passed to the command as a keyword argument called `
``args``. It will contain a list of strings, which is the text
following the command split on spaces. Default is ``False``
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, command, callback, pass_args=False, pass_update_queue=False):
super(StringCommandHandler, self).__init__(callback, pass_update_queue)
def __init__(self,
command,
callback,
pass_args=False,
pass_update_queue=False,
pass_job_queue=False):
super(StringCommandHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
self.command = command
self.pass_args = pass_args

View file

@ -44,9 +44,14 @@ class StringRegexHandler(Handler):
pass_groupdict (optional[bool]): If the callback should be passed the
result of ``re.match(pattern, update).groupdict()`` as a keyword
argument called ``groupdict``. Default is ``False``
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self,
@ -54,8 +59,11 @@ class StringRegexHandler(Handler):
callback,
pass_groups=False,
pass_groupdict=False,
pass_update_queue=False):
super(StringRegexHandler, self).__init__(callback, pass_update_queue)
pass_update_queue=False,
pass_job_queue=False):
super(StringRegexHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
if isinstance(pattern, string_types):
pattern = re.compile(pattern)

View file

@ -34,13 +34,25 @@ class TypeHandler(Handler):
has determined that an update should be processed by this handler.
strict (optional[bool]): Use ``type`` instead of ``isinstance``.
Default is ``False``
pass_update_queue (optional[bool]): If the handler should be passed the
update queue as a keyword argument called ``update_queue``. It can
be used to insert updates. Default is ``False``
pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called
``update_queue`` will be passed to the callback function. It will be the ``Queue``
instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can
be used to insert updates. Default is ``False``.
pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called
``job_queue`` will be passed to the callback function. It will be a ``JobQueue``
instance created by the ``Updater`` which can be used to schedule new jobs.
Default is ``False``.
"""
def __init__(self, type, callback, strict=False, pass_update_queue=False):
super(TypeHandler, self).__init__(callback, pass_update_queue)
def __init__(self,
type,
callback,
strict=False,
pass_update_queue=False,
pass_job_queue=False):
super(TypeHandler, self).__init__(callback,
pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue)
self.type = type
self.strict = strict

View file

@ -65,12 +65,7 @@ class Updater(object):
ValueError: If both `token` and `bot` are passed or none of them.
"""
def __init__(self,
token=None,
base_url=None,
workers=4,
bot=None,
job_queue_tick_interval=1.0):
def __init__(self, token=None, base_url=None, workers=4, bot=None):
if (token is None) and (bot is None):
raise ValueError('`token` or `bot` must be passed')
if (token is not None) and (bot is not None):
@ -81,9 +76,13 @@ class Updater(object):
else:
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.job_queue = JobQueue(self.bot, job_queue_tick_interval)
self.job_queue = JobQueue(self.bot)
self.__exception_event = Event()
self.dispatcher = Dispatcher(self.bot, self.update_queue, workers, self.__exception_event)
self.dispatcher = Dispatcher(self.bot,
self.update_queue,
job_queue=self.job_queue,
workers=workers,
exception_event=self.__exception_event)
self.last_update_id = 0
self.logger = logging.getLogger(__name__)
self.running = False

View file

@ -32,7 +32,7 @@ else:
sys.path.append('.')
from telegram.utils.request import stop_con_pool
from telegram.ext import JobQueue, Updater
from telegram.ext import JobQueue, Job, Updater
from tests.base import BaseTest
# Enable logging
@ -53,7 +53,7 @@ class JobQueueTest(BaseTest, unittest.TestCase):
"""
def setUp(self):
self.jq = JobQueue("Bot", tick_interval=0.005)
self.jq = JobQueue("Bot")
self.result = 0
def tearDown(self):
@ -61,46 +61,108 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.jq.stop()
stop_con_pool()
def job1(self, bot):
def job1(self, bot, job):
self.result += 1
def job2(self, bot):
def job2(self, bot, job):
raise Exception("Test Error")
def job3(self, bot, job):
self.result += 1
job.schedule_removal()
def job4(self, bot, job):
self.result += job.context
def test_basic(self):
self.jq.put(self.job1, 0.1)
self.jq.put(Job(self.job1, 0.1))
sleep(1.5)
self.assertGreaterEqual(self.result, 10)
def test_job_with_context(self):
self.jq.put(Job(self.job4, 0.1, context=5))
sleep(1.5)
self.assertGreaterEqual(self.result, 50)
def test_noRepeat(self):
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(Job(self.job1, 0.1, repeat=False))
sleep(0.5)
self.assertEqual(1, self.result)
def test_nextT(self):
self.jq.put(self.job1, 0.1, next_t=0.5)
self.jq.put(Job(self.job1, 0.1), next_t=0.5)
sleep(0.45)
self.assertEqual(0, self.result)
sleep(0.1)
self.assertEqual(1, self.result)
def test_multiple(self):
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(self.job1, 0.2, repeat=False)
self.jq.put(self.job1, 0.4)
self.jq.put(Job(self.job1, 0.1, repeat=False))
self.jq.put(Job(self.job1, 0.2, repeat=False))
self.jq.put(Job(self.job1, 0.4))
sleep(1)
self.assertEqual(4, self.result)
def test_error(self):
self.jq.put(self.job2, 0.1)
self.jq.put(self.job1, 0.2)
self.jq.start()
sleep(0.4)
def test_disabled(self):
j0 = Job(self.job1, 0.1)
j1 = Job(self.job1, 0.2)
self.jq.put(j0)
self.jq.put(Job(self.job1, 0.4))
self.jq.put(j1)
j0.enabled = False
j1.enabled = False
sleep(1)
self.assertEqual(2, self.result)
def test_schedule_removal(self):
j0 = Job(self.job1, 0.1)
j1 = Job(self.job1, 0.2)
self.jq.put(j0)
self.jq.put(Job(self.job1, 0.4))
self.jq.put(j1)
j0.schedule_removal()
j1.schedule_removal()
sleep(1)
self.assertEqual(2, self.result)
def test_schedule_removal_from_within(self):
self.jq.put(Job(self.job1, 0.4))
self.jq.put(Job(self.job3, 0.2))
sleep(1)
self.assertEqual(3, self.result)
def test_longer_first(self):
self.jq.put(Job(self.job1, 0.2, repeat=False))
self.jq.put(Job(self.job1, 0.1, repeat=False))
sleep(0.15)
self.assertEqual(1, self.result)
def test_error(self):
self.jq.put(Job(self.job2, 0.1))
self.jq.put(Job(self.job1, 0.2))
self.jq.start()
sleep(0.5)
self.assertEqual(2, self.result)
def test_jobs_tuple(self):
self.jq.stop()
jobs = tuple(Job(self.job1, t) for t in range(5, 25))
for job in jobs:
self.jq.put(job)
self.assertTupleEqual(jobs, self.jq.jobs())
def test_inUpdater(self):
u = Updater(bot="MockBot", job_queue_tick_interval=0.005)
u.job_queue.put(self.job1, 0.5)
u = Updater(bot="MockBot")
u.job_queue.put(Job(self.job1, 0.5))
sleep(0.75)
self.assertEqual(1, self.result)
u.stop()

View file

@ -72,6 +72,11 @@ class UpdaterTest(BaseTest, unittest.TestCase):
WebhookHandler
"""
updater = None
received_message = None
message_count = None
lock = None
def setUp(self):
self.updater = None
self.received_message = None
@ -123,9 +128,12 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.received_message = (groups, groupdict)
self.message_count += 1
def additionalArgsTest(self, bot, update, update_queue, args):
def additionalArgsTest(self, bot, update, update_queue, job_queue, args):
job_queue.put(Job(lambda bot, job: job.schedule_removal(), 0.1))
self.received_message = update
self.message_count += 1
if args[0] == 'resend':
update_queue.put('/test5 noresend')
elif args[0] == 'noresend':
@ -151,13 +159,13 @@ class UpdaterTest(BaseTest, unittest.TestCase):
d = self.updater.dispatcher
from telegram.ext import Filters
handler = MessageHandler([Filters.text], self.telegramHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
self.updater.start_polling(0.01)
sleep(.1)
self.assertEqual(self.received_message, 'Test')
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
self.updater.bot.send_messages = 1
@ -188,7 +196,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
def test_addTelegramMessageHandlerMultipleMessages(self):
self._setup_updater('Multiple', 100)
self.updater.dispatcher.addHandler(MessageHandler([], self.telegramHandlerTest))
self.updater.dispatcher.add_handler(MessageHandler([], self.telegramHandlerTest))
self.updater.start_polling(0.0)
sleep(2)
self.assertEqual(self.received_message, 'Multiple')
@ -199,13 +207,13 @@ class UpdaterTest(BaseTest, unittest.TestCase):
d = self.updater.dispatcher
regobj = re.compile('Te.*')
handler = RegexHandler(regobj, self.telegramHandlerTest)
self.updater.dispatcher.addHandler(handler)
self.updater.dispatcher.add_handler(handler)
self.updater.start_polling(0.01)
sleep(.1)
self.assertEqual(self.received_message, 'Test2')
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
self.updater.bot.send_messages = 1
@ -216,13 +224,13 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('/test')
d = self.updater.dispatcher
handler = CommandHandler('test', self.telegramHandlerTest)
self.updater.dispatcher.addHandler(handler)
self.updater.dispatcher.add_handler(handler)
self.updater.start_polling(0.01)
sleep(.1)
self.assertEqual(self.received_message, '/test')
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
self.updater.bot.send_messages = 1
@ -252,14 +260,14 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = StringRegexHandler('Te.*', self.stringHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
queue.put('Test3')
sleep(.1)
self.assertEqual(self.received_message, 'Test3')
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
queue.put('Test3')
@ -270,7 +278,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = StringCommandHandler('test3', self.stringHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
queue.put('/test3')
@ -278,7 +286,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertEqual(self.received_message, '/test3')
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
queue.put('/test3')
@ -288,7 +296,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
def test_addRemoveErrorHandler(self):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
d.addErrorHandler(self.errorHandlerTest)
d.add_error_handler(self.errorHandlerTest)
queue = self.updater.start_polling(0.01)
error = TelegramError("Unauthorized.")
queue.put(error)
@ -296,7 +304,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertEqual(self.received_message, "Unauthorized.")
# Remove handler
d.removeErrorHandler(self.errorHandlerTest)
d.remove_error_handler(self.errorHandlerTest)
self.reset()
queue.put(error)
@ -307,8 +315,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = StringRegexHandler('.*', self.errorRaisingHandlerTest)
d.addHandler(handler)
self.updater.dispatcher.addErrorHandler(self.errorHandlerTest)
d.add_handler(handler)
self.updater.dispatcher.add_error_handler(self.errorHandlerTest)
queue = self.updater.start_polling(0.01)
queue.put('Test Error 1')
@ -319,7 +327,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('')
d = self.updater.dispatcher
handler = MessageHandler([], self.telegramHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
self.updater.start_polling(0.01, clean=True)
sleep(.1)
self.assertEqual(self.message_count, 0)
@ -328,7 +336,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
def test_errorOnGetUpdates(self):
self._setup_updater('', raise_error=True)
d = self.updater.dispatcher
d.addErrorHandler(self.errorHandlerTest)
d.add_error_handler(self.errorHandlerTest)
self.updater.start_polling(0.01)
sleep(.1)
self.assertEqual(self.received_message, "Test Error 2")
@ -337,7 +345,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = TypeHandler(dict, self.stringHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
payload = {"Test": 42}
queue.put(payload)
@ -345,7 +353,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertEqual(self.received_message, payload)
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
queue.put(payload)
@ -357,8 +365,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
d = self.updater.dispatcher
handler = InlineQueryHandler(self.telegramInlineHandlerTest)
handler2 = ChosenInlineResultHandler(self.telegramInlineHandlerTest)
d.addHandler(handler)
d.addHandler(handler2)
d.add_handler(handler)
d.add_handler(handler2)
queue = self.updater.start_polling(0.01)
update = Update(update_id=0, inline_query="testquery")
update2 = Update(update_id=0, chosen_inline_result="testresult")
@ -371,8 +379,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertEqual(self.received_message[1], "testresult")
# Remove handler
d.removeHandler(handler)
d.removeHandler(handler2)
d.remove_handler(handler)
d.remove_handler(handler2)
self.reset()
queue.put(update)
@ -383,7 +391,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = CallbackQueryHandler(self.telegramCallbackHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
update = Update(update_id=0, callback_query="testcallback")
queue.put(update)
@ -391,7 +399,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertEqual(self.received_message, "testcallback")
# Remove handler
d.removeHandler(handler)
d.remove_handler(handler)
self.reset()
queue.put(update)
@ -402,7 +410,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('Test5', messages=2)
d = self.updater.dispatcher
handler = MessageHandler([], self.asyncHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
self.updater.start_polling(0.01)
sleep(1.2)
self.assertEqual(self.received_message, 'Test5')
@ -413,8 +421,9 @@ class UpdaterTest(BaseTest, unittest.TestCase):
handler = StringCommandHandler('test5',
self.additionalArgsTest,
pass_update_queue=True,
pass_job_queue=True,
pass_args=True)
self.updater.dispatcher.addHandler(handler)
self.updater.dispatcher.add_handler(handler)
queue = self.updater.start_polling(0.01)
queue.put('/test5 resend')
@ -429,7 +438,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.regexGroupHandlerTest,
pass_groupdict=True,
pass_groups=True)
d.addHandler(handler)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
queue.put('This is a test message for regex group matching.')
sleep(.1)
@ -440,7 +449,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('Test6', messages=2)
d = self.updater.dispatcher
handler = MessageHandler([], self.asyncAdditionalHandlerTest, pass_update_queue=True)
d.addHandler(handler)
d.add_handler(handler)
self.updater.start_polling(0.01)
sleep(1.2)
self.assertEqual(self.received_message, 'Test6')
@ -450,7 +459,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = MessageHandler([], self.telegramHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port for travis
@ -500,7 +509,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = MessageHandler([], self.telegramHandlerTest)
d.addHandler(handler)
d.add_handler(handler)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port for travis