Merge pull request #145 from python-telegram-bot/jobqueue

JobQueue by @franciscod
This commit is contained in:
Jannes Höke 2016-01-04 12:34:29 +01:00
commit c2f04331f3
9 changed files with 422 additions and 6 deletions

View file

@ -57,11 +57,13 @@ Table of contents
2. `API`_
3. `Logging`_
3. `JobQueue`_
4. `Examples`_
4. `Logging`_
5. `Documentation`_
5. `Examples`_
6. `Documentation`_
- `License`_
@ -298,6 +300,40 @@ There are many more API methods, to read the full API documentation::
$ pydoc telegram.Bot
-----------
_`JobQueue`
-----------
The ``JobQueue`` allows you to perform tasks with a delay or even periodically::
>>> from telegram import Bot, JobQueue
>>> bot = Bot('TOKEN')
>>> j = JobQueue(bot)
If you're using the ``Updater``, use the bot created by it instead::
>>> from telegram import Updater, JobQueue
>>> updater = Updater('TOKEN')
>>> j = JobQueue(updater.bot)
The job queue uses functions for tasks, so we define one and add it to the queue::
>>> def job1(bot):
... bot.sendMessage(chat_id='@examplechannel', text='One message every minute')
>>> j.put(job1, 60, next_t=0)
You can also have a job that will not be executed repeatedly::
>>> def job2(bot):
... bot.sendMessage(chat_id='@examplechannel', text='A single message with 30s delay')
>>> j.put(job2, 30, repeat=False)
Now, all you have to do is to start the queue. It runs in a seperate thread, so the ``start()`` call is non-blocking.::
>>> j.start()
[...]
>>> j.stop()
----------
_`Logging`
----------
@ -322,6 +358,8 @@ Here follows some examples to help you to get your own Bot up to speed:
- `clibot <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/clibot.py>`_ has a command line interface.
- `timerbot <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/timerbot.py>`_ uses the ``JobQueue`` to send timed messages.
- `Welcome Bot <https://github.com/jh0ker/welcomebot>`_ greets everyone who joins a group chat.
Legacy examples (pre-3.0):

View file

@ -0,0 +1,7 @@
telegram.jobqueue module
========================
.. automodule:: telegram.jobqueue
:members:
:undoc-members:
:show-inheritance:

View file

@ -11,6 +11,7 @@ Submodules
telegram.bot
telegram.updater
telegram.dispatcher
telegram.jobqueue
telegram.chataction
telegram.contact
telegram.document

97
examples/timerbot.py Normal file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Simple Bot to send timed Telegram messages
# This program is dedicated to the public domain under the CC0 license.
"""
This Bot uses the Updater class to handle the bot and the JobQueue to send
timed messages.
First, a few handler functions are defined. Then, those functions are passed to
the Dispatcher and registered at their respective places.
Then, the bot is started and runs until we press Ctrl-C on the command line.
Usage:
Basic Alarm Bot example, sends a message after a set time.
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
from telegram import Updater, JobQueue
import logging
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
job_queue = None
# Define a few command handlers. These usually take the two arguments bot and
# update. Error handlers also receive the raised TelegramError object in error.
def start(bot, update):
bot.sendMessage(update.message.chat_id, text='Hi! Use /set <seconds> to '
'set a timer')
def set(bot, update, args):
""" Adds a job to the queue """
chat_id = update.message.chat_id
try:
# args[0] should contain the time for the timer in seconds
due = int(args[0])
def alarm(bot):
""" Inner function to send the alarm message """
bot.sendMessage(chat_id, text='Beep!')
# Add job to queue
job_queue.put(alarm, due, repeat=False)
bot.sendMessage(chat_id, text='Timer successfully set!')
except IndexError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
except ValueError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def main():
global job_queue
updater = Updater("TOKEN")
job_queue = JobQueue(updater.bot, tick_interval=1)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on different commands - answer in Telegram
dp.addTelegramCommandHandler("start", start)
dp.addTelegramCommandHandler("help", start)
dp.addTelegramCommandHandler("set", set)
# log all errors
dp.addErrorHandler(error)
# start the job queue
job_queue.start()
# Start the Bot
updater.start_polling()
# Block until the you presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
# After that, also stop the job queue
job_queue.stop()
if __name__ == '__main__':
main()

View file

@ -49,10 +49,11 @@ from .update import Update
from .bot import Bot
from .dispatcher import Dispatcher
from .updater import Updater
from .jobqueue import JobQueue
__all__ = ['Bot', 'Updater', 'Dispatcher', 'Emoji', 'TelegramError',
'InputFile', 'ReplyMarkup', 'ForceReply', 'ReplyKeyboardHide',
'ReplyKeyboardMarkup', 'UserProfilePhotos', 'ChatAction',
'Location', 'Contact', 'Video', 'Sticker', 'Document', 'File',
'Audio', 'PhotoSize', 'Chat', 'Update', 'ParseMode', 'Message',
'User', 'TelegramObject', 'NullHandler', 'Voice']
'User', 'TelegramObject', 'NullHandler', 'Voice', 'JobQueue']

View file

@ -187,7 +187,8 @@ class Dispatcher:
# All other errors should not stop the thread, just print them
except:
self.logger.exception()
self.logger.exception("An uncaught error was raised while "
"processing an update")
else:
self.__lock.release()
self.logger.info('Dispatcher thread stopped')

156
telegram/jobqueue.py Normal file
View file

@ -0,0 +1,156 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015 Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""
This module contains the class JobQueue
"""
import logging
import time
from threading import Thread, Lock
try:
from queue import PriorityQueue
except ImportError:
from Queue import PriorityQueue
class JobQueue(object):
"""
This class allows you to periodically perform tasks with the bot.
Attributes:
tick_interval (float):
queue (PriorityQueue):
bot (Bot):
running (bool):
Args:
bot (Bot): The bot instance that should be passed to the jobs
Keyword Args:
tick_interval (Optional[float]): The interval this queue should check
the newest task in seconds. Defaults to 1.0
"""
def __init__(self, bot, tick_interval=1.0):
self.tick_interval = tick_interval
self.queue = PriorityQueue()
self.bot = bot
self.logger = logging.getLogger(__name__)
self.__lock = Lock()
self.running = False
def put(self, run, interval, repeat=True, next_t=None):
"""
Queue a new job.
Args:
run (function): A function that takes the parameter `bot`
interval (float): The interval in seconds in which `run` should be
executed
repeat (Optional[bool]): If false, job will only be executed once
next_t (Optional[float]): Time in seconds in which run should be
executed first. Defaults to `interval`
"""
name = run.__name__
job = JobQueue.Job()
job.run = run
job.interval = interval
job.name = name
job.repeat = repeat
if next_t is None:
next_t = interval
next_t += time.time()
self.logger.debug("Putting a %s with t=%f" % (job.name, next_t))
self.queue.put((next_t, job))
def tick(self):
"""
Run all jobs that are due and re-enqueue them with their interval
"""
now = time.time()
self.logger.debug("Ticking jobs with t=%f" % now)
while not self.queue.empty():
t, j = self.queue.queue[0]
self.logger.debug("Peeked at %s with t=%f" % (j.name, t))
if t < now:
self.queue.get()
self.logger.info("Running job %s" % j.name)
try:
j.run(self.bot)
except:
self.logger.exception("An uncaught error was raised while "
"executing job %s" % j.name)
if j.repeat:
self.put(j.run, j.interval)
continue
self.logger.debug("Next task isn't due yet. Finished!")
break
def start(self):
"""
Starts the job_queue thread.
"""
self.__lock.acquire()
if not self.running:
self.running = True
self.__lock.release()
job_queue_thread = Thread(target=self._start,
name="job_queue")
job_queue_thread.start()
self.logger.info('Job Queue thread started')
else:
self.__lock.release()
def _start(self):
"""
Thread target of thread 'job_queue'. Runs in background and performs
ticks on the job queue.
"""
while self.running:
self.tick()
time.sleep(self.tick_interval)
self.logger.info('Job Queue thread stopped')
def stop(self):
"""
Stops the thread
"""
with self.__lock:
self.running = False
class Job(object):
""" Inner class that represents a job """
interval = None
name = None
repeat = None
def run(self):
pass
def __lt__(self, other):
return False

View file

@ -288,7 +288,7 @@ class Updater:
def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)):
"""
Waits for the user to press Ctrl-C and stops the updater
Blocks until one of the signals are received and stops the updater
Args:
stop_signals: Iterable containing signals from the signal module

115
tests/test_jobqueue.py Normal file
View file

@ -0,0 +1,115 @@
#!/usr/bin/env python
# encoding: utf-8
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015 Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""
This module contains a object that represents Tests for JobQueue
"""
import logging
import sys
from time import sleep
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
try:
from urllib2 import urlopen, Request
except ImportError:
from urllib.request import Request, urlopen
sys.path.append('.')
from telegram import JobQueue
from tests.base import BaseTest
# Enable logging
root = logging.getLogger()
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.WARN)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
class JobQueueTest(BaseTest, unittest.TestCase):
"""
This object represents Tests for Updater, Dispatcher, WebhookServer and
WebhookHandler
"""
def setUp(self):
self.jq = JobQueue("Bot", tick_interval=0.005)
self.result = 0
def tearDown(self):
if self.jq is not None:
self.jq.stop()
def job1(self, bot):
self.result += 1
def job2(self, bot):
raise Exception("Test Error")
def test_basic(self):
print('Testing basic job queue function')
self.jq.put(self.job1, 0.1)
self.jq.start()
sleep(1.5)
self.assertGreaterEqual(self.result, 10)
def test_noRepeat(self):
print('Testing job queue without repeat')
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.start()
sleep(0.5)
self.assertEqual(1, self.result)
def test_nextT(self):
print('Testing job queue with a set next_t value')
self.jq.put(self.job1, 0.1, next_t=0.5)
self.jq.start()
sleep(0.45)
self.assertEqual(0, self.result)
sleep(0.1)
self.assertEqual(1, self.result)
def test_multiple(self):
print('Testing job queue with multiple jobs')
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.put(self.job1, 0.2, repeat=False)
self.jq.put(self.job1, 0.4)
self.jq.start()
sleep(1)
self.assertEqual(4, self.result)
def test_error(self):
print('Testing job queue starting twice with an erroneous job')
self.jq.put(self.job2, 0.1)
self.jq.put(self.job1, 0.2)
self.jq.start()
self.jq.start()
sleep(0.4)
self.assertEqual(1, self.result)
if __name__ == '__main__':
unittest.main()