From 2875bae8810d4dc6a6993b392ebb077e4bcdd61e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Tue, 5 Jan 2016 13:32:19 +0100 Subject: [PATCH] Integrate JobQueue with Updater --- README.rst | 26 +++++++++++--------------- telegram/__init__.py | 2 +- telegram/jobqueue.py | 17 ++++++++++++++--- telegram/updater.py | 13 ++++++++++--- tests/test_jobqueue.py | 17 +++++++++++------ 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/README.rst b/README.rst index f49515e4e..dee98f323 100644 --- a/README.rst +++ b/README.rst @@ -304,23 +304,17 @@ There are many more API methods, to read the full API documentation:: _`JobQueue` ----------- -The ``JobQueue`` allows you to perform tasks with a delay or even periodically:: +The ``JobQueue`` allows you to perform tasks with a delay or even periodically. The ``Updater`` will create one for you:: - >>> from telegram import Bot, JobQueue - >>> bot = Bot('TOKEN') - >>> j = JobQueue(bot) + >>> from telegram import Updater + >>> u = Updater('TOKEN') + >>> j = u.job_queue -If you're using the ``Updater``, use the bot created by it instead:: - - >>> from telegram import Updater, JobQueue - >>> updater = Updater('TOKEN') - >>> j = JobQueue(updater.bot) - -The job queue uses functions for tasks, so we define one and add it to the queue:: +The job queue uses functions for tasks, so we define one and add it to the queue. Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``:: >>> def job1(bot): ... bot.sendMessage(chat_id='@examplechannel', text='One message every minute') - >>> j.put(job1, 60, next_t=0) + >>> j.put(job1, 60, next_t=0, prevent_autostart=True) You can also have a job that will not be executed repeatedly:: @@ -328,10 +322,12 @@ You can also have a job that will not be executed repeatedly:: ... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay') >>> j.put(job2, 30, repeat=False) -Now, all you have to do is to start the queue. It runs in a seperate thread, so the ``start()`` call is non-blocking.:: +Now, because we didn't prevent the auto start this time, the queue will start ticking. It runs in a seperate thread, so it is non-blocking. When we stop the Updater, the related queue will be stopped as well:: + + >>> u.stop() + +We can also stop the job queue by itself:: - >>> j.start() - [...] >>> j.stop() ---------- diff --git a/telegram/__init__.py b/telegram/__init__.py index 12b2ec193..45c486559 100644 --- a/telegram/__init__.py +++ b/telegram/__init__.py @@ -48,8 +48,8 @@ from .message import Message from .update import Update from .bot import Bot from .dispatcher import Dispatcher -from .updater import Updater from .jobqueue import JobQueue +from .updater import Updater __all__ = ['Bot', 'Updater', 'Dispatcher', 'Emoji', 'TelegramError', 'InputFile', 'ReplyMarkup', 'ForceReply', 'ReplyKeyboardHide', diff --git a/telegram/jobqueue.py b/telegram/jobqueue.py index 0311facda..429a39b00 100644 --- a/telegram/jobqueue.py +++ b/telegram/jobqueue.py @@ -56,17 +56,24 @@ class JobQueue(object): self.__lock = Lock() self.running = False - def put(self, run, interval, repeat=True, next_t=None): + def put(self, + run, + interval, + repeat=True, + next_t=None, + prevent_autostart=False): """ - Queue a new job. + Queue a new job. If the JobQueue is not running, it will be started. Args: run (function): A function that takes the parameter `bot` interval (float): The interval in seconds in which `run` should be executed - repeat (Optional[bool]): If false, job will only be executed once + repeat (Optional[bool]): If `False`, job will only be executed once next_t (Optional[float]): Time in seconds in which run should be executed first. Defaults to `interval` + prevent_autostart (Optional[bool]): If `True`, the job queue will + not be started automatically if it is not running. """ name = run.__name__ @@ -84,6 +91,10 @@ class JobQueue(object): self.logger.debug("Putting a %s with t=%f" % (job.name, next_t)) self.queue.put((next_t, job)) + if not self.running and not prevent_autostart: + self.logger.info("Auto-starting JobQueue") + self.start() + def tick(self): """ Run all jobs that are due and re-enqueue them with their interval diff --git a/telegram/updater.py b/telegram/updater.py index 2a501463b..6fbf0f6a8 100644 --- a/telegram/updater.py +++ b/telegram/updater.py @@ -29,7 +29,7 @@ from time import sleep import subprocess from signal import signal, SIGINT, SIGTERM, SIGABRT from telegram import (Bot, TelegramError, dispatcher, Dispatcher, - NullHandler) + NullHandler, JobQueue) from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler) # Adjust for differences in Python versions @@ -74,7 +74,12 @@ class Updater: ValueError: If both `token` and `bot` are passed or none of them. """ - def __init__(self, token=None, base_url=None, workers=4, bot=None): + def __init__(self, + token=None, + base_url=None, + workers=4, + bot=None, + job_queue_tick_interval=1.0): if (token is None) and (bot is None): raise ValueError('`token` or `bot` must be passed') if (token is not None) and (bot is not None): @@ -85,6 +90,7 @@ class Updater: else: self.bot = Bot(token, base_url) self.update_queue = Queue() + self.job_queue = JobQueue(bot, job_queue_tick_interval) self.dispatcher = Dispatcher(self.bot, self.update_queue, workers=workers) self.last_update_id = 0 @@ -257,9 +263,10 @@ class Updater: def stop(self): """ - Stops the polling/webhook thread and the dispatcher + Stops the polling/webhook thread, the dispatcher and the job queue """ + self.job_queue.stop() with self.__lock: if self.running: self.running = False diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 57e250c1e..c71481577 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -36,7 +36,7 @@ except ImportError: sys.path.append('.') -from telegram import JobQueue +from telegram import JobQueue, Updater from tests.base import BaseTest # Enable logging @@ -73,21 +73,18 @@ class JobQueueTest(BaseTest, unittest.TestCase): def test_basic(self): print('Testing basic job queue function') self.jq.put(self.job1, 0.1) - self.jq.start() sleep(1.5) self.assertGreaterEqual(self.result, 10) def test_noRepeat(self): print('Testing job queue without repeat') self.jq.put(self.job1, 0.1, repeat=False) - self.jq.start() sleep(0.5) self.assertEqual(1, self.result) def test_nextT(self): print('Testing job queue with a set next_t value') self.jq.put(self.job1, 0.1, next_t=0.5) - self.jq.start() sleep(0.45) self.assertEqual(0, self.result) sleep(0.1) @@ -98,7 +95,6 @@ class JobQueueTest(BaseTest, unittest.TestCase): self.jq.put(self.job1, 0.1, repeat=False) self.jq.put(self.job1, 0.2, repeat=False) self.jq.put(self.job1, 0.4) - self.jq.start() sleep(1) self.assertEqual(4, self.result) @@ -107,7 +103,16 @@ class JobQueueTest(BaseTest, unittest.TestCase): self.jq.put(self.job2, 0.1) self.jq.put(self.job1, 0.2) self.jq.start() - self.jq.start() + sleep(0.4) + self.assertEqual(1, self.result) + + def test_inUpdater(self): + print('Testing job queue created by updater') + u = Updater(bot="MockBot", job_queue_tick_interval=0.005) + u.job_queue.put(self.job1, 0.1) + sleep(0.15) + self.assertEqual(1, self.result) + u.stop() sleep(0.4) self.assertEqual(1, self.result)