make job queue API similar to the dispatcher, add new functionality

This commit is contained in:
Jannes Höke 2016-05-25 22:51:13 +02:00
parent d90b0f495d
commit 3aedd78e29
6 changed files with 260 additions and 88 deletions

View file

@ -395,23 +395,53 @@ The ``JobQueue`` allows you to perform tasks with a delay or even periodically.
>>> u = Updater('TOKEN')
>>> j = u.job_queue
The job queue uses functions for tasks, so we define one and add it to the queue. Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``:
The job queue uses the ``Job`` class for tasks. We define a callback function, instantiate a ``Job`` and add it to the queue.
Usually, when the first job is added to the queue, it wil start automatically. We can prevent this by setting ``prevent_autostart=True``:
.. code:: python
>>> def job1(bot):
>>> from telegram.ext import Job
>>> def callback_minute(bot, job):
... bot.sendMessage(chat_id='@examplechannel', text='One message every minute')
>>> j.put(job1, 60, next_t=0, prevent_autostart=True)
...
>>> job_minute = Job(callback_minute, 60.0, next_t=0.0)
>>> j.put(job_minute, prevent_autostart=True)
You can also have a job that will not be executed repeatedly:
You can also have a job that will be executed only once:
.. code:: python
>>> def job2(bot):
>>> def callback_30(bot, job):
... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay')
>>> j.put(job2, 30, repeat=False)
...
>>> j.put(Job(callback_30, 30.0, repeat=False))
Now, because we didn't prevent the auto start this time, the queue will start ticking. It runs in a seperate thread, so it is non-blocking. When we stop the Updater, the related queue will be stopped as well:
Now, because we didn't prevent the auto start this time, the queue will start working. It runs in a seperate thread, so it is non-blocking.
Jobs can be temporarily disabled or completely removed from the ``JobQueue``:
.. code:: python
>>> job_minute.enabled = False
>>> job_minute.schedule_removal()
Please note that ``schedule_removal`` does not immediately removes the job from the queue. Instead, it is marked for removal and will be removed as soon as its current interval is over (it will not run again after being marked for removal).
A job can also change its own behaviour, as it is passed to the callback function as the second argument:
.. code:: python
>>> def callback_increasing(bot, job):
... bot.sendMessage(chat_id='@examplechannel',
... text='Sending messages with increasing delay up to 10s, then stops.')
... job.interval += 1.0
... if job.interval > 10.0:
... job.schedule_removal()
...
>>> j.put(Job(callback_increasing, 1.0))
When we stop the Updater, the related queue will be stopped as well:
.. code:: python

View file

@ -17,15 +17,16 @@ Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
from telegram.ext import Updater, CommandHandler
from telegram.ext import Updater, CommandHandler, Job
import logging
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
level=logging.DEBUG)
logger = logging.getLogger(__name__)
job_queue = None
timers = dict()
# Define a few command handlers. These usually take the two arguments bot and
@ -35,7 +36,7 @@ def start(bot, update):
def set(bot, update, args):
""" Adds a job to the queue """
"""Adds a job to the queue"""
chat_id = update.message.chat_id
try:
# args[0] should contain the time for the timer in seconds
@ -43,20 +44,34 @@ def set(bot, update, args):
if due < 0:
bot.sendMessage(chat_id, text='Sorry we can not go back to future!')
def alarm(bot):
""" Inner function to send the alarm message """
def alarm(bot, job):
"""Inner function to send the alarm message"""
bot.sendMessage(chat_id, text='Beep!')
# Add job to queue
job_queue.put(alarm, due, repeat=False)
job = Job(alarm, due, repeat=False)
timers[chat_id] = job
job_queue.put(job)
bot.sendMessage(chat_id, text='Timer successfully set!')
except IndexError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
except ValueError:
except (IndexError, ValueError):
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
def unset(bot, update):
"""Removes the job if the user changed their mind"""
chat_id = update.message.chat_id
if chat_id not in timers:
bot.sendMessage(chat_id, text='You have no active timer')
return
job = timers[chat_id]
job.schedule_removal()
bot.sendMessage(chat_id, text='Timer successfully unset!')
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
@ -64,7 +79,7 @@ def error(bot, update, error):
def main():
global job_queue
updater = Updater("TOKEN")
updater = Updater("148447715:AAH4M0gzPG11_mdQS1Qeb0Ex30I5-rw9bMY")
job_queue = updater.job_queue
# Get the dispatcher to register handlers
@ -74,6 +89,7 @@ def main():
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", start))
dp.add_handler(CommandHandler("set", set, pass_args=True))
dp.add_handler(CommandHandler("unset", unset))
# log all errors
dp.add_error_handler(error)

View file

@ -19,7 +19,7 @@
"""Extensions over the Telegram Bot API to facilitate bot making"""
from .dispatcher import Dispatcher
from .jobqueue import JobQueue
from .jobqueue import JobQueue, Job
from .updater import Updater
from .callbackqueryhandler import CallbackQueryHandler
from .choseninlineresulthandler import ChosenInlineResultHandler
@ -32,7 +32,7 @@ from .stringcommandhandler import StringCommandHandler
from .stringregexhandler import StringRegexHandler
from .typehandler import TypeHandler
__all__ = ('Dispatcher', 'JobQueue', 'Updater', 'CallbackQueryHandler',
'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler',
'MessageHandler', 'Filters', 'RegexHandler', 'StringCommandHandler',
'StringRegexHandler', 'TypeHandler')
__all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler',
'ChosenInlineResultHandler', 'CommandHandler', 'Handler',
'InlineQueryHandler', 'MessageHandler', 'Filters', 'RegexHandler',
'StringCommandHandler', 'StringRegexHandler', 'TypeHandler')

View file

@ -20,7 +20,7 @@
import logging
import time
from threading import Thread, Lock
from threading import Thread, Lock, Event
from queue import PriorityQueue
@ -29,56 +29,50 @@ class JobQueue(object):
This class allows you to periodically perform tasks with the bot.
Attributes:
tick_interval (float):
queue (PriorityQueue):
bot (Bot):
running (bool):
Args:
bot (Bot): The bot instance that should be passed to the jobs
tick_interval (Optional[float]): The interval this queue should check
the newest task in seconds. Defaults to 1.0
"""
def __init__(self, bot, tick_interval=1.0):
self.tick_interval = tick_interval
def __init__(self, bot):
self.queue = PriorityQueue()
self.bot = bot
self.logger = logging.getLogger(__name__)
self.__lock = Lock()
self.__tick = Event()
self.next_peek = None
self.running = False
def put(self, run, interval, repeat=True, next_t=None, prevent_autostart=False):
def put(self, job, next_t=None, prevent_autostart=False):
"""
Queue a new job. If the JobQueue is not running, it will be started.
Args:
run (function): A function that takes the parameter `bot`
interval (float): The interval in seconds in which `run` should be
executed
repeat (Optional[bool]): If `False`, job will only be executed once
next_t (Optional[float]): Time in seconds in which run should be
executed first. Defaults to `interval`
prevent_autostart (Optional[bool]): If `True`, the job queue will
not be started automatically if it is not running.
job (Job): The ``Job`` instance representing the new job
next_t (Optional[float]): Time in seconds in which the job should be executed first.
Defaults to ``job.interval``
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
automatically if it is not running. Defaults to ``False``
"""
name = run.__name__
job = JobQueue.Job()
job.run = run
job.interval = interval
job.name = name
job.repeat = repeat
job.job_queue = self
if next_t is None:
next_t = interval
next_t = job.interval
next_t += time.time()
now = time.time()
next_t += now
self.logger.debug('Putting a %s with t=%f' % (job.name, next_t))
self.queue.put((next_t, job))
if not self.next_peek or self.next_peek > next_t:
self.next_peek = next_t
self.__tick.set()
if not self.running and not prevent_autostart:
self.logger.debug('Auto-starting JobQueue')
self.start()
@ -90,25 +84,45 @@ class JobQueue(object):
now = time.time()
self.logger.debug('Ticking jobs with t=%f' % now)
while not self.queue.empty():
t, j = self.queue.queue[0]
self.logger.debug('Peeked at %s with t=%f' % (j.name, t))
if t < now:
while not self.queue.empty():
t, job = self.queue.queue[0]
self.logger.debug('Peeked at %s with t=%f' % (job.name, t))
if t <= now:
self.queue.get()
self.logger.debug('Running job %s' % j.name)
try:
j.run(self.bot)
except:
self.logger.exception('An uncaught error was raised while '
'executing job %s' % j.name)
if j.repeat:
self.put(j.run, j.interval)
if job._remove.is_set():
self.logger.debug('Removing job %s' % job.name)
continue
elif job.enabled:
self.logger.debug('Running job %s' % job.name)
try:
job.run()
except:
self.logger.exception('An uncaught error was raised while executing job %s'
% job.name)
else:
self.logger.debug('Skipping disabled job %s' % job.name)
if job.repeat:
self.put(job)
continue
self.logger.debug('Next task isn\'t due yet. Finished!')
self.next_peek = t
break
else:
self.next_peek = None
self.__tick.clear()
def start(self):
"""
Starts the job_queue thread.
@ -128,9 +142,16 @@ class JobQueue(object):
Thread target of thread 'job_queue'. Runs in background and performs
ticks on the job queue.
"""
while self.running:
self.__tick.wait(self.next_peek and self.next_peek - time.time())
# If we were woken up by set(), wait with the new timeout
if self.__tick.is_set():
self.__tick.clear()
continue
self.tick()
time.sleep(self.tick_interval)
self.logger.debug('Job Queue thread stopped')
@ -141,14 +162,66 @@ class JobQueue(object):
with self.__lock:
self.running = False
class Job(object):
""" Inner class that represents a job """
interval = None
name = None
repeat = None
self.__tick.set()
def run(self):
pass
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``"""
return tuple(job[1] for job in self.queue.queue if job)
def __lt__(self, other):
return False
class Job(object):
"""This class encapsulates a Job
Attributes:
callback (function):
interval (float):
repeat (bool):
name (str):
enabled (bool): If this job is currently active
Args:
callback (function): The callback function that should be executed by the Job. It should
take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It
can be used to terminate the job or modify its interval.
interval (float): The interval in which this job should execute its callback function in
seconds.
repeat (Optional[bool]): If this job should be periodically execute its callback function
(``True``) or only once (``False``). (default=``True``)
"""
job_queue = None
def __init__(self, callback, interval, repeat=True):
self.callback = callback
self.interval = interval
self.repeat = repeat
self.name = callback.__name__
self._remove = Event()
self._enabled = Event()
self._enabled.set()
def run(self):
"""Executes the callback function"""
self.callback(self.job_queue.bot, self)
def schedule_removal(self):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
"""
self._remove.set()
def is_enabled(self):
return self._enabled.is_set()
def set_enabled(self, status):
if status:
self._enabled.set()
else:
self._enabled.clear()
enabled = property(is_enabled, set_enabled)
def __lt__(self, other):
return False

View file

@ -69,8 +69,7 @@ class Updater(object):
token=None,
base_url=None,
workers=4,
bot=None,
job_queue_tick_interval=1.0):
bot=None):
if (token is None) and (bot is None):
raise ValueError('`token` or `bot` must be passed')
if (token is not None) and (bot is not None):
@ -81,7 +80,7 @@ class Updater(object):
else:
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.job_queue = JobQueue(self.bot, job_queue_tick_interval)
self.job_queue = JobQueue(self.bot)
self.__exception_event = Event()
self.dispatcher = Dispatcher(self.bot, self.update_queue, workers, self.__exception_event)
self.last_update_id = 0

View file

@ -31,7 +31,7 @@ else:
sys.path.append('.')
from telegram.ext import JobQueue, Updater
from telegram.ext import JobQueue, Job, Updater
from tests.base import BaseTest
# Enable logging
@ -52,53 +52,107 @@ class JobQueueTest(BaseTest, unittest.TestCase):
"""
def setUp(self):
self.jq = JobQueue("Bot", tick_interval=0.005)
self.jq = JobQueue("Bot")
self.result = 0
def tearDown(self):
if self.jq is not None:
self.jq.stop()
def job1(self, bot):
def job1(self, bot, job):
self.result += 1
def job2(self, bot):
def job2(self, bot, job):
raise Exception("Test Error")
def job3(self, bot, job):
self.result += 1
job.schedule_removal()
def test_basic(self):
self.jq.put(self.job1, 0.1)
self.jq.put(Job(self.job1, 0.1))
sleep(1.5)
self.assertGreaterEqual(self.result, 10)
def test_noRepeat(self):
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(Job(self.job1, 0.1, repeat=False))
sleep(0.5)
self.assertEqual(1, self.result)
def test_nextT(self):
self.jq.put(self.job1, 0.1, next_t=0.5)
self.jq.put(Job(self.job1, 0.1), next_t=0.5)
sleep(0.45)
self.assertEqual(0, self.result)
sleep(0.1)
self.assertEqual(1, self.result)
def test_multiple(self):
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(self.job1, 0.2, repeat=False)
self.jq.put(self.job1, 0.4)
self.jq.put(Job(self.job1, 0.1, repeat=False))
self.jq.put(Job(self.job1, 0.2, repeat=False))
self.jq.put(Job(self.job1, 0.4))
sleep(1)
self.assertEqual(4, self.result)
def test_error(self):
self.jq.put(self.job2, 0.1)
self.jq.put(self.job1, 0.2)
self.jq.start()
sleep(0.4)
def test_disabled(self):
j0 = Job(self.job1, 0.1)
j1 = Job(self.job1, 0.2)
self.jq.put(j0)
self.jq.put(Job(self.job1, 0.4))
self.jq.put(j1)
j0.enabled = False
j1.enabled = False
sleep(1)
self.assertEqual(2, self.result)
def test_schedule_removal(self):
j0 = Job(self.job1, 0.1)
j1 = Job(self.job1, 0.2)
self.jq.put(j0)
self.jq.put(Job(self.job1, 0.4))
self.jq.put(j1)
j0.schedule_removal()
j1.schedule_removal()
sleep(1)
self.assertEqual(2, self.result)
def test_schedule_removal_from_within(self):
self.jq.put(Job(self.job1, 0.4))
self.jq.put(Job(self.job3, 0.2))
sleep(1)
self.assertEqual(3, self.result)
def test_longer_first(self):
self.jq.put(Job(self.job1, 0.2, repeat=False))
self.jq.put(Job(self.job1, 0.1, repeat=False))
sleep(0.15)
self.assertEqual(1, self.result)
def test_error(self):
self.jq.put(Job(self.job2, 0.1))
self.jq.put(Job(self.job1, 0.2))
self.jq.start()
sleep(0.5)
self.assertEqual(2, self.result)
def test_jobs_tuple(self):
jobs = tuple(Job(self.job1, t) for t in range(5, 25))
for job in jobs:
self.jq.put(job)
self.assertTupleEqual(jobs, self.jq.jobs())
def test_inUpdater(self):
u = Updater(bot="MockBot", job_queue_tick_interval=0.005)
u.job_queue.put(self.job1, 0.5)
u = Updater(bot="MockBot")
u.job_queue.put(Job(self.job1, 0.5))
sleep(0.75)
self.assertEqual(1, self.result)
u.stop()