Integrate JobQueue with Updater

This commit is contained in:
Jannes Höke 2016-01-05 13:32:19 +01:00
parent c756aa1c02
commit 2875bae881
5 changed files with 47 additions and 28 deletions

View file

@ -304,23 +304,17 @@ There are many more API methods, to read the full API documentation::
_`JobQueue` _`JobQueue`
----------- -----------
The ``JobQueue`` allows you to perform tasks with a delay or even periodically:: The ``JobQueue`` allows you to perform tasks with a delay or even periodically. The ``Updater`` will create one for you::
>>> from telegram import Bot, JobQueue >>> from telegram import Updater
>>> bot = Bot('TOKEN') >>> u = Updater('TOKEN')
>>> j = JobQueue(bot) >>> j = u.job_queue
If you're using the ``Updater``, use the bot created by it instead:: The job queue uses functions for tasks, so we define one and add it to the queue. Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``::
>>> from telegram import Updater, JobQueue
>>> updater = Updater('TOKEN')
>>> j = JobQueue(updater.bot)
The job queue uses functions for tasks, so we define one and add it to the queue::
>>> def job1(bot): >>> def job1(bot):
... bot.sendMessage(chat_id='@examplechannel', text='One message every minute') ... bot.sendMessage(chat_id='@examplechannel', text='One message every minute')
>>> j.put(job1, 60, next_t=0) >>> j.put(job1, 60, next_t=0, prevent_autostart=True)
You can also have a job that will not be executed repeatedly:: You can also have a job that will not be executed repeatedly::
@ -328,10 +322,12 @@ You can also have a job that will not be executed repeatedly::
... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay') ... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay')
>>> j.put(job2, 30, repeat=False) >>> j.put(job2, 30, repeat=False)
Now, all you have to do is to start the queue. It runs in a seperate thread, so the ``start()`` call is non-blocking.:: Now, because we didn't prevent the auto start this time, the queue will start ticking. It runs in a seperate thread, so it is non-blocking. When we stop the Updater, the related queue will be stopped as well::
>>> u.stop()
We can also stop the job queue by itself::
>>> j.start()
[...]
>>> j.stop() >>> j.stop()
---------- ----------

View file

@ -48,8 +48,8 @@ from .message import Message
from .update import Update from .update import Update
from .bot import Bot from .bot import Bot
from .dispatcher import Dispatcher from .dispatcher import Dispatcher
from .updater import Updater
from .jobqueue import JobQueue from .jobqueue import JobQueue
from .updater import Updater
__all__ = ['Bot', 'Updater', 'Dispatcher', 'Emoji', 'TelegramError', __all__ = ['Bot', 'Updater', 'Dispatcher', 'Emoji', 'TelegramError',
'InputFile', 'ReplyMarkup', 'ForceReply', 'ReplyKeyboardHide', 'InputFile', 'ReplyMarkup', 'ForceReply', 'ReplyKeyboardHide',

View file

@ -56,17 +56,24 @@ class JobQueue(object):
self.__lock = Lock() self.__lock = Lock()
self.running = False self.running = False
def put(self, run, interval, repeat=True, next_t=None): def put(self,
run,
interval,
repeat=True,
next_t=None,
prevent_autostart=False):
""" """
Queue a new job. Queue a new job. If the JobQueue is not running, it will be started.
Args: Args:
run (function): A function that takes the parameter `bot` run (function): A function that takes the parameter `bot`
interval (float): The interval in seconds in which `run` should be interval (float): The interval in seconds in which `run` should be
executed executed
repeat (Optional[bool]): If false, job will only be executed once repeat (Optional[bool]): If `False`, job will only be executed once
next_t (Optional[float]): Time in seconds in which run should be next_t (Optional[float]): Time in seconds in which run should be
executed first. Defaults to `interval` executed first. Defaults to `interval`
prevent_autostart (Optional[bool]): If `True`, the job queue will
not be started automatically if it is not running.
""" """
name = run.__name__ name = run.__name__
@ -84,6 +91,10 @@ class JobQueue(object):
self.logger.debug("Putting a %s with t=%f" % (job.name, next_t)) self.logger.debug("Putting a %s with t=%f" % (job.name, next_t))
self.queue.put((next_t, job)) self.queue.put((next_t, job))
if not self.running and not prevent_autostart:
self.logger.info("Auto-starting JobQueue")
self.start()
def tick(self): def tick(self):
""" """
Run all jobs that are due and re-enqueue them with their interval Run all jobs that are due and re-enqueue them with their interval

View file

@ -29,7 +29,7 @@ from time import sleep
import subprocess import subprocess
from signal import signal, SIGINT, SIGTERM, SIGABRT from signal import signal, SIGINT, SIGTERM, SIGABRT
from telegram import (Bot, TelegramError, dispatcher, Dispatcher, from telegram import (Bot, TelegramError, dispatcher, Dispatcher,
NullHandler) NullHandler, JobQueue)
from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler) from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler)
# Adjust for differences in Python versions # Adjust for differences in Python versions
@ -74,7 +74,12 @@ class Updater:
ValueError: If both `token` and `bot` are passed or none of them. ValueError: If both `token` and `bot` are passed or none of them.
""" """
def __init__(self, token=None, base_url=None, workers=4, bot=None): def __init__(self,
token=None,
base_url=None,
workers=4,
bot=None,
job_queue_tick_interval=1.0):
if (token is None) and (bot is None): if (token is None) and (bot is None):
raise ValueError('`token` or `bot` must be passed') raise ValueError('`token` or `bot` must be passed')
if (token is not None) and (bot is not None): if (token is not None) and (bot is not None):
@ -85,6 +90,7 @@ class Updater:
else: else:
self.bot = Bot(token, base_url) self.bot = Bot(token, base_url)
self.update_queue = Queue() self.update_queue = Queue()
self.job_queue = JobQueue(bot, job_queue_tick_interval)
self.dispatcher = Dispatcher(self.bot, self.update_queue, self.dispatcher = Dispatcher(self.bot, self.update_queue,
workers=workers) workers=workers)
self.last_update_id = 0 self.last_update_id = 0
@ -257,9 +263,10 @@ class Updater:
def stop(self): def stop(self):
""" """
Stops the polling/webhook thread and the dispatcher Stops the polling/webhook thread, the dispatcher and the job queue
""" """
self.job_queue.stop()
with self.__lock: with self.__lock:
if self.running: if self.running:
self.running = False self.running = False

View file

@ -36,7 +36,7 @@ except ImportError:
sys.path.append('.') sys.path.append('.')
from telegram import JobQueue from telegram import JobQueue, Updater
from tests.base import BaseTest from tests.base import BaseTest
# Enable logging # Enable logging
@ -73,21 +73,18 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def test_basic(self): def test_basic(self):
print('Testing basic job queue function') print('Testing basic job queue function')
self.jq.put(self.job1, 0.1) self.jq.put(self.job1, 0.1)
self.jq.start()
sleep(1.5) sleep(1.5)
self.assertGreaterEqual(self.result, 10) self.assertGreaterEqual(self.result, 10)
def test_noRepeat(self): def test_noRepeat(self):
print('Testing job queue without repeat') print('Testing job queue without repeat')
self.jq.put(self.job1, 0.1, repeat=False) self.jq.put(self.job1, 0.1, repeat=False)
self.jq.start()
sleep(0.5) sleep(0.5)
self.assertEqual(1, self.result) self.assertEqual(1, self.result)
def test_nextT(self): def test_nextT(self):
print('Testing job queue with a set next_t value') print('Testing job queue with a set next_t value')
self.jq.put(self.job1, 0.1, next_t=0.5) self.jq.put(self.job1, 0.1, next_t=0.5)
self.jq.start()
sleep(0.45) sleep(0.45)
self.assertEqual(0, self.result) self.assertEqual(0, self.result)
sleep(0.1) sleep(0.1)
@ -98,7 +95,6 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.jq.put(self.job1, 0.1, repeat=False) self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(self.job1, 0.2, repeat=False) self.jq.put(self.job1, 0.2, repeat=False)
self.jq.put(self.job1, 0.4) self.jq.put(self.job1, 0.4)
self.jq.start()
sleep(1) sleep(1)
self.assertEqual(4, self.result) self.assertEqual(4, self.result)
@ -107,7 +103,16 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.jq.put(self.job2, 0.1) self.jq.put(self.job2, 0.1)
self.jq.put(self.job1, 0.2) self.jq.put(self.job1, 0.2)
self.jq.start() self.jq.start()
self.jq.start() sleep(0.4)
self.assertEqual(1, self.result)
def test_inUpdater(self):
print('Testing job queue created by updater')
u = Updater(bot="MockBot", job_queue_tick_interval=0.005)
u.job_queue.put(self.job1, 0.1)
sleep(0.15)
self.assertEqual(1, self.result)
u.stop()
sleep(0.4) sleep(0.4)
self.assertEqual(1, self.result) self.assertEqual(1, self.result)