fixes, tests and examples for job queue

This commit is contained in:
Jannes Höke 2016-01-04 01:56:22 +01:00
parent f813d4f5ec
commit dd7d1255d1
3 changed files with 213 additions and 5 deletions

97
examples/timerbot.py Normal file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Simple Bot to send timed Telegram messages
# This program is dedicated to the public domain under the CC0 license.
"""
This Bot uses the Updater class to handle the bot and the JobQueue to send
timed messages.
First, a few handler functions are defined. Then, those functions are passed to
the Dispatcher and registered at their respective places.
Then, the bot is started and runs until we press Ctrl-C on the command line.
Usage:
Basic Alarm Bot example, sends a message after a set time.
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
from telegram import Updater, JobQueue
import logging
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
job_queue = None
# Define a few command handlers. These usually take the two arguments bot and
# update. Error handlers also receive the raised TelegramError object in error.
def start(bot, update):
bot.sendMessage(update.message.chat_id, text='Hi! Use /set <seconds> to '
'set a timer')
def set(bot, update, args):
""" Adds a job to the queue """
chat_id = update.message.chat_id
try:
# args[0] should contain the time for the timer in seconds
due = int(args[0])
def alarm(bot):
""" Inner function to send the alarm message """
bot.sendMessage(chat_id, text='Beep!')
# Add job to queue
job_queue.put(alarm, due, repeat=False)
bot.sendMessage(chat_id, text='Timer successfully set!')
except IndexError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
except ValueError:
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def main():
global job_queue
updater = Updater("148447715:AAHbczRui6gO3RBlKQ2IwU2hMd226LqZE90")
job_queue = JobQueue(updater.bot, tick_interval=1)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on different commands - answer in Telegram
dp.addTelegramCommandHandler("start", start)
dp.addTelegramCommandHandler("help", start)
dp.addTelegramCommandHandler("set", set)
# log all errors
dp.addErrorHandler(error)
# start the job queue
job_queue.start()
# Start the Bot
updater.start_polling()
# Block until the you presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
# After that, also stop the job queue
job_queue.stop()
if __name__ == '__main__':
main()

View file

@ -31,7 +31,8 @@ except ImportError:
class JobQueue(object):
"""This class allows you to periodically perform tasks with the bot.
"""
This class allows you to periodically perform tasks with the bot.
Attributes:
tick_interval (float):
@ -55,7 +56,7 @@ class JobQueue(object):
self.__lock = Lock()
self.running = False
def put(self, run, interval, next_t=None):
def put(self, run, interval, repeat=True, next_t=None):
"""
Queue a new job.
@ -63,6 +64,7 @@ class JobQueue(object):
run (function): A function that takes the parameter `bot`
interval (float): The interval in seconds in which `run` should be
executed
repeat (Optional[bool]): If false, job will only be executed once
next_t (Optional[float]): Time in seconds in which run should be
executed first. Defaults to `interval`
"""
@ -72,6 +74,7 @@ class JobQueue(object):
job.run = run
job.interval = interval
job.name = name
job.repeat = repeat
if next_t is None:
next_t = interval
@ -96,8 +99,13 @@ class JobQueue(object):
if t < now:
self.queue.get()
self.logger.debug("About time! running")
self.logger.info("Running job {}".format(j.name))
try:
j.run(self.bot)
except:
self.logger.exception("An uncaught error was raised while "
"executing job {}".format(j.name))
if j.repeat:
self.put(j.run, j.interval)
continue
@ -115,6 +123,7 @@ class JobQueue(object):
job_queue_thread = Thread(target=self._start,
name="job_queue")
job_queue_thread.start()
self.logger.info('Job Queue thread started')
else:
self.__lock.release()
@ -127,6 +136,8 @@ class JobQueue(object):
self.tick()
time.sleep(self.tick_interval)
self.logger.info('Job Queue thread stopped')
def stop(self):
"""
Stops the thread
@ -138,6 +149,7 @@ class JobQueue(object):
""" Inner class that represents a job """
interval = None
name = None
repeat = None
def run(self):
pass

99
tests/test_jobqueue.py Normal file
View file

@ -0,0 +1,99 @@
#!/usr/bin/env python
# encoding: utf-8
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015 Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""
This module contains a object that represents Tests for JobQueue
"""
import logging
import sys
import re
import os
import signal
from random import randrange
from time import sleep
from datetime import datetime
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
try:
from urllib2 import urlopen, Request
except ImportError:
from urllib.request import Request, urlopen
sys.path.append('.')
from telegram import JobQueue
from tests.base import BaseTest
# Enable logging
root = logging.getLogger()
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.WARN)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
class UpdaterTest(BaseTest, unittest.TestCase):
"""
This object represents Tests for Updater, Dispatcher, WebhookServer and
WebhookHandler
"""
def setUp(self):
self.jq = JobQueue("Bot", tick_interval=0.001)
self.result = 0
def tearDown(self):
if self.jq is not None:
self.jq.stop()
def job1(self, bot):
self.result += 1
def test_basic(self):
print('Testing basic job queue function')
self.jq.put(self.job1, 0.1)
self.jq.start()
sleep(1.05)
self.assertEqual(10, self.result)
def test_noRepeat(self):
print('Testing job queue without repeat')
self.jq.put(self.job1, 0.1, repeat=False)
self.jq.start()
sleep(0.5)
self.assertEqual(1, self.result)
def test_nextT(self):
print('Testing job queue with a set next_t value')
self.jq.put(self.job1, 0.1, next_t=0.5)
self.jq.start()
sleep(0.45)
self.assertEqual(0, self.result)
sleep(0.1)
self.assertEqual(1, self.result)
if __name__ == '__main__':
unittest.main()