From 3aedd78e297e8647507e2c0c46caddf54b62b331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Wed, 25 May 2016 22:51:13 +0200 Subject: [PATCH] make job queue API similar to the dispatcher, add new functionality --- README.rst | 44 +++++++++-- examples/timerbot.py | 36 ++++++--- telegram/ext/__init__.py | 10 +-- telegram/ext/jobqueue.py | 165 ++++++++++++++++++++++++++++----------- telegram/ext/updater.py | 5 +- tests/test_jobqueue.py | 88 +++++++++++++++++---- 6 files changed, 260 insertions(+), 88 deletions(-) diff --git a/README.rst b/README.rst index b7ce2c8c6..e24af25b7 100644 --- a/README.rst +++ b/README.rst @@ -395,23 +395,53 @@ The ``JobQueue`` allows you to perform tasks with a delay or even periodically. >>> u = Updater('TOKEN') >>> j = u.job_queue -The job queue uses functions for tasks, so we define one and add it to the queue. Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``: +The job queue uses the ``Job`` class for tasks. We define a callback function, instantiate a ``Job`` and add it to the queue. + +Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``: .. code:: python - >>> def job1(bot): + >>> from telegram.ext import Job + >>> def callback_minute(bot, job): ... bot.sendMessage(chat_id='@examplechannel', text='One message every minute') - >>> j.put(job1, 60, next_t=0, prevent_autostart=True) + ... + >>> job_minute = Job(callback_minute, 60.0, next_t=0.0) + >>> j.put(job_minute, prevent_autostart=True) -You can also have a job that will not be executed repeatedly: +You can also have a job that will be executed only once: .. code:: python - >>> def job2(bot): + >>> def callback_30(bot, job): ... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay') - >>> j.put(job2, 30, repeat=False) + ... + >>> j.put(Job(callback_30, 30.0, repeat=False)) -Now, because we didn't prevent the auto start this time, the queue will start ticking. It runs in a seperate thread, so it is non-blocking. When we stop the Updater, the related queue will be stopped as well: +Now, because we didn't prevent the auto start this time, the queue will start working. It runs in a seperate thread, so it is non-blocking. + +Jobs can be temporarily disabled or completely removed from the ``JobQueue``: + +.. code:: python + + >>> job_minute.enabled = False + >>> job_minute.schedule_removal() + +Please note that ``schedule_removal`` does not immediately removes the job from the queue. Instead, it is marked for removal and will be removed as soon as its current interval is over (it will not run again after being marked for removal). + +A job can also change its own behaviour, as it is passed to the callback function as the second argument: + +.. code:: python + + >>> def callback_increasing(bot, job): + ... bot.sendMessage(chat_id='@examplechannel', + ... text='Sending messages with increasing delay up to 10s, then stops.') + ... job.interval += 1.0 + ... if job.interval > 10.0: + ... job.schedule_removal() + ... + >>> j.put(Job(callback_increasing, 1.0)) + +When we stop the Updater, the related queue will be stopped as well: .. code:: python diff --git a/examples/timerbot.py b/examples/timerbot.py index 4c9faccf7..151f45156 100644 --- a/examples/timerbot.py +++ b/examples/timerbot.py @@ -17,15 +17,16 @@ Press Ctrl-C on the command line or send a signal to the process to stop the bot. """ -from telegram.ext import Updater, CommandHandler +from telegram.ext import Updater, CommandHandler, Job import logging # Enable logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.INFO) + level=logging.DEBUG) logger = logging.getLogger(__name__) job_queue = None +timers = dict() # Define a few command handlers. These usually take the two arguments bot and @@ -35,7 +36,7 @@ def start(bot, update): def set(bot, update, args): - """ Adds a job to the queue """ + """Adds a job to the queue""" chat_id = update.message.chat_id try: # args[0] should contain the time for the timer in seconds @@ -43,20 +44,34 @@ def set(bot, update, args): if due < 0: bot.sendMessage(chat_id, text='Sorry we can not go back to future!') - def alarm(bot): - """ Inner function to send the alarm message """ + def alarm(bot, job): + """Inner function to send the alarm message""" bot.sendMessage(chat_id, text='Beep!') # Add job to queue - job_queue.put(alarm, due, repeat=False) + job = Job(alarm, due, repeat=False) + timers[chat_id] = job + job_queue.put(job) + bot.sendMessage(chat_id, text='Timer successfully set!') - except IndexError: - bot.sendMessage(chat_id, text='Usage: /set ') - except ValueError: + except (IndexError, ValueError): bot.sendMessage(chat_id, text='Usage: /set ') +def unset(bot, update): + """Removes the job if the user changed their mind""" + chat_id = update.message.chat_id + + if chat_id not in timers: + bot.sendMessage(chat_id, text='You have no active timer') + return + + job = timers[chat_id] + job.schedule_removal() + bot.sendMessage(chat_id, text='Timer successfully unset!') + + def error(bot, update, error): logger.warn('Update "%s" caused error "%s"' % (update, error)) @@ -64,7 +79,7 @@ def error(bot, update, error): def main(): global job_queue - updater = Updater("TOKEN") + updater = Updater("148447715:AAH4M0gzPG11_mdQS1Qeb0Ex30I5-rw9bMY") job_queue = updater.job_queue # Get the dispatcher to register handlers @@ -74,6 +89,7 @@ def main(): dp.add_handler(CommandHandler("start", start)) dp.add_handler(CommandHandler("help", start)) dp.add_handler(CommandHandler("set", set, pass_args=True)) + dp.add_handler(CommandHandler("unset", unset)) # log all errors dp.add_error_handler(error) diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index 59a8b800d..10b0194a5 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -19,7 +19,7 @@ """Extensions over the Telegram Bot API to facilitate bot making""" from .dispatcher import Dispatcher -from .jobqueue import JobQueue +from .jobqueue import JobQueue, Job from .updater import Updater from .callbackqueryhandler import CallbackQueryHandler from .choseninlineresulthandler import ChosenInlineResultHandler @@ -32,7 +32,7 @@ from .stringcommandhandler import StringCommandHandler from .stringregexhandler import StringRegexHandler from .typehandler import TypeHandler -__all__ = ('Dispatcher', 'JobQueue', 'Updater', 'CallbackQueryHandler', - 'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler', - 'MessageHandler', 'Filters', 'RegexHandler', 'StringCommandHandler', - 'StringRegexHandler', 'TypeHandler') +__all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler', + 'ChosenInlineResultHandler', 'CommandHandler', 'Handler', + 'InlineQueryHandler', 'MessageHandler', 'Filters', 'RegexHandler', + 'StringCommandHandler', 'StringRegexHandler', 'TypeHandler') diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index 616468399..8a1dad253 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -20,7 +20,7 @@ import logging import time -from threading import Thread, Lock +from threading import Thread, Lock, Event from queue import PriorityQueue @@ -29,56 +29,50 @@ class JobQueue(object): This class allows you to periodically perform tasks with the bot. Attributes: - tick_interval (float): queue (PriorityQueue): bot (Bot): - running (bool): Args: bot (Bot): The bot instance that should be passed to the jobs - tick_interval (Optional[float]): The interval this queue should check - the newest task in seconds. Defaults to 1.0 """ - def __init__(self, bot, tick_interval=1.0): - self.tick_interval = tick_interval + def __init__(self, bot): self.queue = PriorityQueue() self.bot = bot self.logger = logging.getLogger(__name__) self.__lock = Lock() + self.__tick = Event() + self.next_peek = None self.running = False - def put(self, run, interval, repeat=True, next_t=None, prevent_autostart=False): + def put(self, job, next_t=None, prevent_autostart=False): """ Queue a new job. If the JobQueue is not running, it will be started. Args: - run (function): A function that takes the parameter `bot` - interval (float): The interval in seconds in which `run` should be - executed - repeat (Optional[bool]): If `False`, job will only be executed once - next_t (Optional[float]): Time in seconds in which run should be - executed first. Defaults to `interval` - prevent_autostart (Optional[bool]): If `True`, the job queue will - not be started automatically if it is not running. + job (Job): The ``Job`` instance representing the new job + next_t (Optional[float]): Time in seconds in which the job should be executed first. + Defaults to ``job.interval`` + prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started + automatically if it is not running. Defaults to ``False`` """ - name = run.__name__ - job = JobQueue.Job() - job.run = run - job.interval = interval - job.name = name - job.repeat = repeat + job.job_queue = self if next_t is None: - next_t = interval + next_t = job.interval - next_t += time.time() + now = time.time() + next_t += now self.logger.debug('Putting a %s with t=%f' % (job.name, next_t)) self.queue.put((next_t, job)) + if not self.next_peek or self.next_peek > next_t: + self.next_peek = next_t + self.__tick.set() + if not self.running and not prevent_autostart: self.logger.debug('Auto-starting JobQueue') self.start() @@ -90,25 +84,45 @@ class JobQueue(object): now = time.time() self.logger.debug('Ticking jobs with t=%f' % now) - while not self.queue.empty(): - t, j = self.queue.queue[0] - self.logger.debug('Peeked at %s with t=%f' % (j.name, t)) - if t < now: + while not self.queue.empty(): + t, job = self.queue.queue[0] + self.logger.debug('Peeked at %s with t=%f' % (job.name, t)) + + if t <= now: self.queue.get() - self.logger.debug('Running job %s' % j.name) - try: - j.run(self.bot) - except: - self.logger.exception('An uncaught error was raised while ' - 'executing job %s' % j.name) - if j.repeat: - self.put(j.run, j.interval) + + if job._remove.is_set(): + self.logger.debug('Removing job %s' % job.name) + continue + + elif job.enabled: + self.logger.debug('Running job %s' % job.name) + + try: + job.run() + + except: + self.logger.exception('An uncaught error was raised while executing job %s' + % job.name) + + else: + self.logger.debug('Skipping disabled job %s' % job.name) + + if job.repeat: + self.put(job) + continue self.logger.debug('Next task isn\'t due yet. Finished!') + self.next_peek = t break + else: + self.next_peek = None + + self.__tick.clear() + def start(self): """ Starts the job_queue thread. @@ -128,9 +142,16 @@ class JobQueue(object): Thread target of thread 'job_queue'. Runs in background and performs ticks on the job queue. """ + while self.running: + self.__tick.wait(self.next_peek and self.next_peek - time.time()) + + # If we were woken up by set(), wait with the new timeout + if self.__tick.is_set(): + self.__tick.clear() + continue + self.tick() - time.sleep(self.tick_interval) self.logger.debug('Job Queue thread stopped') @@ -141,14 +162,66 @@ class JobQueue(object): with self.__lock: self.running = False - class Job(object): - """ Inner class that represents a job """ - interval = None - name = None - repeat = None + self.__tick.set() - def run(self): - pass + def jobs(self): + """Returns a tuple of all jobs that are currently in the ``JobQueue``""" + return tuple(job[1] for job in self.queue.queue if job) - def __lt__(self, other): - return False + +class Job(object): + """This class encapsulates a Job + + Attributes: + callback (function): + interval (float): + repeat (bool): + name (str): + enabled (bool): If this job is currently active + + Args: + callback (function): The callback function that should be executed by the Job. It should + take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It + can be used to terminate the job or modify its interval. + interval (float): The interval in which this job should execute its callback function in + seconds. + repeat (Optional[bool]): If this job should be periodically execute its callback function + (``True``) or only once (``False``). (default=``True``) + + """ + job_queue = None + + def __init__(self, callback, interval, repeat=True): + self.callback = callback + self.interval = interval + self.repeat = repeat + + self.name = callback.__name__ + self._remove = Event() + self._enabled = Event() + self._enabled.set() + + def run(self): + """Executes the callback function""" + self.callback(self.job_queue.bot, self) + + def schedule_removal(self): + """ + Schedules this job for removal from the ``JobQueue``. It will be removed without executing + its callback function again. + """ + self._remove.set() + + def is_enabled(self): + return self._enabled.is_set() + + def set_enabled(self, status): + if status: + self._enabled.set() + else: + self._enabled.clear() + + enabled = property(is_enabled, set_enabled) + + def __lt__(self, other): + return False diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index e552c81e2..d9d687ac5 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -69,8 +69,7 @@ class Updater(object): token=None, base_url=None, workers=4, - bot=None, - job_queue_tick_interval=1.0): + bot=None): if (token is None) and (bot is None): raise ValueError('`token` or `bot` must be passed') if (token is not None) and (bot is not None): @@ -81,7 +80,7 @@ class Updater(object): else: self.bot = Bot(token, base_url) self.update_queue = Queue() - self.job_queue = JobQueue(self.bot, job_queue_tick_interval) + self.job_queue = JobQueue(self.bot) self.__exception_event = Event() self.dispatcher = Dispatcher(self.bot, self.update_queue, workers, self.__exception_event) self.last_update_id = 0 diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 4ac90035d..d54ea9890 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -31,7 +31,7 @@ else: sys.path.append('.') -from telegram.ext import JobQueue, Updater +from telegram.ext import JobQueue, Job, Updater from tests.base import BaseTest # Enable logging @@ -52,53 +52,107 @@ class JobQueueTest(BaseTest, unittest.TestCase): """ def setUp(self): - self.jq = JobQueue("Bot", tick_interval=0.005) + self.jq = JobQueue("Bot") self.result = 0 def tearDown(self): if self.jq is not None: self.jq.stop() - def job1(self, bot): + def job1(self, bot, job): self.result += 1 - def job2(self, bot): + def job2(self, bot, job): raise Exception("Test Error") + def job3(self, bot, job): + self.result += 1 + job.schedule_removal() + def test_basic(self): - self.jq.put(self.job1, 0.1) + self.jq.put(Job(self.job1, 0.1)) sleep(1.5) self.assertGreaterEqual(self.result, 10) def test_noRepeat(self): - self.jq.put(self.job1, 0.1, repeat=False) + self.jq.put(Job(self.job1, 0.1, repeat=False)) sleep(0.5) self.assertEqual(1, self.result) def test_nextT(self): - self.jq.put(self.job1, 0.1, next_t=0.5) + self.jq.put(Job(self.job1, 0.1), next_t=0.5) sleep(0.45) self.assertEqual(0, self.result) sleep(0.1) self.assertEqual(1, self.result) def test_multiple(self): - self.jq.put(self.job1, 0.1, repeat=False) - self.jq.put(self.job1, 0.2, repeat=False) - self.jq.put(self.job1, 0.4) + self.jq.put(Job(self.job1, 0.1, repeat=False)) + self.jq.put(Job(self.job1, 0.2, repeat=False)) + self.jq.put(Job(self.job1, 0.4)) sleep(1) self.assertEqual(4, self.result) - def test_error(self): - self.jq.put(self.job2, 0.1) - self.jq.put(self.job1, 0.2) - self.jq.start() - sleep(0.4) + def test_disabled(self): + j0 = Job(self.job1, 0.1) + j1 = Job(self.job1, 0.2) + + self.jq.put(j0) + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(j1) + + j0.enabled = False + j1.enabled = False + + sleep(1) + self.assertEqual(2, self.result) + + def test_schedule_removal(self): + j0 = Job(self.job1, 0.1) + j1 = Job(self.job1, 0.2) + + self.jq.put(j0) + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(j1) + + j0.schedule_removal() + j1.schedule_removal() + + sleep(1) + self.assertEqual(2, self.result) + + def test_schedule_removal_from_within(self): + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(Job(self.job3, 0.2)) + + sleep(1) + self.assertEqual(3, self.result) + + def test_longer_first(self): + self.jq.put(Job(self.job1, 0.2, repeat=False)) + self.jq.put(Job(self.job1, 0.1, repeat=False)) + sleep(0.15) self.assertEqual(1, self.result) + def test_error(self): + self.jq.put(Job(self.job2, 0.1)) + self.jq.put(Job(self.job1, 0.2)) + self.jq.start() + sleep(0.5) + self.assertEqual(2, self.result) + + def test_jobs_tuple(self): + + jobs = tuple(Job(self.job1, t) for t in range(5, 25)) + + for job in jobs: + self.jq.put(job) + + self.assertTupleEqual(jobs, self.jq.jobs()) + def test_inUpdater(self): - u = Updater(bot="MockBot", job_queue_tick_interval=0.005) - u.job_queue.put(self.job1, 0.5) + u = Updater(bot="MockBot") + u.job_queue.put(Job(self.job1, 0.5)) sleep(0.75) self.assertEqual(1, self.result) u.stop()