diff --git a/AUTHORS.rst b/AUTHORS.rst index 970ea5fce..f27388412 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -41,6 +41,7 @@ The following wonderful people contributed directly or indirectly to this projec - `Joscha Götzer `_ - `Shelomentsev D `_ - `sooyhwang `_ +- `thodnev `_ - `Valentijn `_ - `voider1 `_ - `wjt `_ diff --git a/docs/source/telegram.ext.messagequeue.rst b/docs/source/telegram.ext.messagequeue.rst new file mode 100644 index 000000000..80cce040c --- /dev/null +++ b/docs/source/telegram.ext.messagequeue.rst @@ -0,0 +1,7 @@ +telegram.ext.messagequeue module +================================ + +.. automodule:: telegram.ext.messagequeue + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/telegram.ext.rst b/docs/source/telegram.ext.rst index 4da91810c..9af7aa4c4 100644 --- a/docs/source/telegram.ext.rst +++ b/docs/source/telegram.ext.rst @@ -16,6 +16,7 @@ Submodules telegram.ext.commandhandler telegram.ext.inlinequeryhandler telegram.ext.messagehandler + telegram.ext.messagequeue telegram.ext.filters telegram.ext.regexhandler telegram.ext.stringcommandhandler diff --git a/telegram/ext/messagequeue.py b/telegram/ext/messagequeue.py new file mode 100644 index 000000000..6d512a27f --- /dev/null +++ b/telegram/ext/messagequeue.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python +# +# Module author: +# Tymofii A. Khodniev (thodnev) +# +# A library that provides a Python interface to the Telegram Bot API +# Copyright (C) 2015-2017 +# Leandro Toledo de Souza +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser Public License for more details. +# +# You should have received a copy of the GNU Lesser Public License +# along with this program. If not, see [http://www.gnu.org/licenses/] +'''A throughput-limiting message processor for Telegram bots''' +from telegram.utils import promise + +import functools +import sys +import time +import threading +if sys.version_info.major > 2: + import queue as q +else: + import Queue as q + +# We need to count < 1s intervals, so the most accurate timer is needed +# Starting from Python 3.3 we have time.perf_counter which is the clock +# with the highest resolution available to the system, so let's use it there. +# In Python 2.7, there's no perf_counter yet, so fallback on what we have: +# on Windows, the best available is time.clock while time.time is on +# another platforms (M. Lutz, "Learning Python," 4ed, p.630-634) +if sys.version_info.major == 3 and sys.version_info.minor >= 3: + curtime = time.perf_counter +else: + curtime = time.clock if sys.platform[:3] == 'win' else time.time + + +class DelayQueueError(RuntimeError): + '''Indicates processing errors''' + pass + + +class DelayQueue(threading.Thread): + '''Processes callbacks from queue with specified throughput limits. + Creates a separate thread to process callbacks with delays. + + Args: + queue (:obj:`queue.Queue`, optional): used to pass callbacks to + thread. + Creates `queue.Queue` implicitly if not provided. + burst_limit (:obj:`int`, optional): number of maximum callbacks to + process per time-window defined by `time_limit_ms`. + Defaults to 30. + time_limit_ms (:obj:`int`, optional): defines width of time-window + used when each processing limit is calculated. + Defaults to 1000. + exc_route (:obj:`callable`, optional): a callable, accepting 1 + positional argument; used to route exceptions from processor + thread to main thread; is called on `Exception` subclass + exceptions. + If not provided, exceptions are routed through dummy handler, + which re-raises them. + autostart (:obj:`bool`, optional): if True, processor is started + immediately after object's creation; if False, should be + started manually by `start` method. + Defaults to True. + name (:obj:`str`, optional): thread's name. + Defaults to ``'DelayQueue-N'``, where N is sequential + number of object created. + ''' + _instcnt = 0 # instance counter + + def __init__(self, + queue=None, + burst_limit=30, + time_limit_ms=1000, + exc_route=None, + autostart=True, + name=None): + self._queue = queue if queue is not None else q.Queue() + self.burst_limit = burst_limit + self.time_limit = time_limit_ms / 1000 + self.exc_route = (exc_route if exc_route is not None else self._default_exception_handler) + self.__exit_req = False # flag to gently exit thread + self.__class__._instcnt += 1 + if name is None: + name = '%s-%s' % (self.__class__.__name__, self.__class__._instcnt) + super(DelayQueue, self).__init__(name=name) + self.daemon = False + if autostart: # immediately start processing + super(DelayQueue, self).start() + + def run(self): + '''Do not use the method except for unthreaded testing purposes, + the method normally is automatically called by `start` method. + ''' + times = [] # used to store each callable processing time + while True: + item = self._queue.get() + if self.__exit_req: + return # shutdown thread + # delay routine + now = curtime() + t_delta = now - self.time_limit # calculate early to improve perf. + if times and t_delta > times[-1]: + # if last call was before the limit time-window + # used to impr. perf. in long-interval calls case + times = [now] + else: + # collect last in current limit time-window + times = [t for t in times if t >= t_delta] + times.append(now) + if len(times) >= self.burst_limit: # if throughput limit was hit + time.sleep(times[1] - t_delta) + # finally process one + try: + func, args, kwargs = item + func(*args, **kwargs) + except Exception as exc: # re-route any exceptions + self.exc_route(exc) # to prevent thread exit + + def stop(self, timeout=None): + '''Used to gently stop processor and shutdown its thread. + + Args: + timeout (:obj:`float`): indicates maximum time to wait for + processor to stop and its thread to exit. + If timeout exceeds and processor has not stopped, method + silently returns. `is_alive` could be used afterwards + to check the actual status. If `timeout` set to None, blocks + until processor is shut down. + Defaults to None. + Returns: + None + ''' + self.__exit_req = True # gently request + self._queue.put(None) # put something to unfreeze if frozen + super(DelayQueue, self).join(timeout=timeout) + + @staticmethod + def _default_exception_handler(exc): + '''Dummy exception handler which re-raises exception in thread. + Could be possibly overwritten by subclasses. + ''' + raise exc + + def __call__(self, func, *args, **kwargs): + '''Used to process callbacks in throughput-limiting thread + through queue. + Args: + func (:obj:`callable`): the actual function (or any callable) that + is processed through queue. + *args: variable-length `func` arguments. + **kwargs: arbitrary keyword-arguments to `func`. + Returns: + None + ''' + if not self.is_alive() or self.__exit_req: + raise DelayQueueError('Could not process callback in stopped thread') + self._queue.put((func, args, kwargs)) + + +# The most straightforward way to implement this is to use 2 sequenital delay +# queues, like on classic delay chain schematics in electronics. +# So, message path is: +# msg --> group delay if group msg, else no delay --> normal msg delay --> out +# This way OS threading scheduler cares of timings accuracy. +# (see time.time, time.clock, time.perf_counter, time.sleep @ docs.python.org) +class MessageQueue(object): + '''Implements callback processing with proper delays to avoid hitting + Telegram's message limits. + Contains two `DelayQueue`s, for group and for all messages, interconnected + in delay chain. Callables are processed through *group* `DelayQueue`, then + through *all* `DelayQueue` for group-type messages. For non-group messages, + only the *all* `DelayQueue` is used. + + Args: + all_burst_limit (:obj:`int`, optional): numer of maximum *all-type* + callbacks to process per time-window defined by + `all_time_limit_ms`. + Defaults to 30. + all_time_limit_ms (:obj:`int`, optional): defines width of *all-type* + time-window used when each processing limit is calculated. + Defaults to 1000 ms. + group_burst_limit (:obj:`int`, optional): numer of maximum *group-type* + callbacks to process per time-window defined by + `group_time_limit_ms`. + Defaults to 20. + group_time_limit_ms (:obj:`int`, optional): defines width of + *group-type* time-window used when each processing limit is + calculated. + Defaults to 60000 ms. + exc_route (:obj:`callable`, optional): a callable, accepting one + positional argument; used to route exceptions from processor + threads to main thread; is called on `Exception` subclass + exceptions. + If not provided, exceptions are routed through dummy handler, + which re-raises them. + autostart (:obj:`bool`, optional): if True, processors are started + immediately after object's creation; if False, should be + started manually by `start` method. + Defaults to True. + + Attributes: + _all_delayq (:obj:`telegram.ext.messagequeue.DelayQueue`): actual + `DelayQueue` used for *all-type* callback processing + _group_delayq (:obj:`telegram.ext.messagequeue.DelayQueue`): actual + `DelayQueue` used for *group-type* callback processing + ''' + + def __init__(self, + all_burst_limit=30, + all_time_limit_ms=1000, + group_burst_limit=20, + group_time_limit_ms=60000, + exc_route=None, + autostart=True): + # create accoring delay queues, use composition + self._all_delayq = DelayQueue( + burst_limit=all_burst_limit, + time_limit_ms=all_time_limit_ms, + exc_route=exc_route, + autostart=autostart) + self._group_delayq = DelayQueue( + burst_limit=group_burst_limit, + time_limit_ms=group_time_limit_ms, + exc_route=exc_route, + autostart=autostart) + + def start(self): + '''Method is used to manually start the `MessageQueue` processing + + Returns: + None + ''' + self._all_delayq.start() + self._group_delayq.start() + + def stop(self, timeout=None): + self._group_delayq.stop(timeout=timeout) + self._all_delayq.stop(timeout=timeout) + + stop.__doc__ = DelayQueue.stop.__doc__ or '' # reuse docsting if any + + def __call__(self, promise, is_group_msg=False): + '''Processes callables in troughput-limiting queues to avoid + hitting limits (specified with \*_burst_limit and *\_time_limit_ms). + Args: + promise (:obj:`callable`): mainly the + :obj:`telegram.utils.promise.Promise` (see Notes for other + callables), that is processed in delay queues + is_group_msg (:obj:`bool`, optional): defines whether `promise` + would be processed in *group*+*all* `DelayQueue`s + (if set to ``True``), or only through *all* `DelayQueue` + (if set to ``False``), resulting in needed delays to avoid + hitting specified limits. + Defaults to ``True``. + + Notes: + Method is designed to accept :obj:`telegram.utils.promise.Promise` + as `promise` argument, but other callables could be used too. + For example, lambdas or simple functions could be used to wrap + original func to be called with needed args. + In that case, be sure that either wrapper func does not raise + outside exceptions or the proper `exc_route` handler is provided. + + Returns: + :obj:`callable` used as `promise` argument. + ''' + if not is_group_msg: # ignore middle group delay + self._all_delayq(promise) + else: # use middle group delay + self._group_delayq(self._all_delayq, promise) + return promise + + +def queuedmessage(method): + '''A decorator to be used with `telegram.bot.Bot` send* methods. + + Note: + As it probably wouldn't be a good idea to make this decorator a + property, it had been coded as decorator function, so it implies that + **first positional argument to wrapped MUST be self**\. + + The next object attributes are used by decorator: + + Attributes: + self._is_messages_queued_default (:obj:`bool`): Value to provide + class-defaults to `queued` kwarg if not provided during wrapped + method call. + self._msg_queue (:obj:`telegram.ext.messagequeue.MessageQueue`): + The actual `MessageQueue` used to delay outbond messages according + to specified time-limits. + + Wrapped method starts accepting the next kwargs: + + Args: + queued (:obj:`bool`, optional): if set to ``True``, the `MessageQueue` + is used to process output messages. + Defaults to `self._is_queued_out`. + isgroup (:obj:`bool`, optional): if set to ``True``, the message is + meant to be group-type (as there's no obvious way to determine its + type in other way at the moment). Group-type messages could have + additional processing delay according to limits set in + `self._out_queue`. + Defaults to ``False``. + + Returns: + Either :obj:`telegram.utils.promise.Promise` in case call is queued, + or original method's return value if it's not. + ''' + + @functools.wraps(method) + def wrapped(self, *args, **kwargs): + queued = kwargs.pop('queued', self._is_messages_queued_default) + isgroup = kwargs.pop('isgroup', False) + if queued: + prom = promise.Promise(method, args, kwargs) + return self._msg_queue(prom, isgroup) + return method(self, *args, **kwargs) + + return wrapped diff --git a/tests/test_messagequeue.py b/tests/test_messagequeue.py new file mode 100644 index 000000000..3aa33c75a --- /dev/null +++ b/tests/test_messagequeue.py @@ -0,0 +1,91 @@ +'''This module contains telegram.ext.messagequeue test logic''' +from __future__ import print_function, division + +import sys +import os +import time +import unittest + +sys.path.insert(0, os.path.dirname(__file__) + os.sep + '..') +from tests.base import BaseTest + +from telegram.ext import messagequeue as mq + + +class DelayQueueTest(BaseTest, unittest.TestCase): + + def __init__(self, *args, **kwargs): + self._N = kwargs.pop('N', 128) + self._msglimit = kwargs.pop('burst_limit', 30) + self._timelimit = kwargs.pop('time_limit_ms', 1000) + self._margin = kwargs.pop('margin_ms', 0) + isprint = kwargs.pop('isprint', False) + + def noprint(*args, **kwargs): + pass + + self._print = print if isprint else noprint + super(DelayQueueTest, self).__init__(*args, **kwargs) + + def setUp(self): + print = self._print + print('Self-test with N = {} msgs, burst_limit = {} msgs, ' + 'time_limit = {:.2f} ms, margin = {:.2f} ms' + ''.format(self._N, self._msglimit, self._timelimit, self._margin)) + self.testtimes = [] + + def testcall(): + self.testtimes.append(mq.curtime()) + + self.testcall = testcall + + def test_delayqueue_limits(self): + '''Test that DelayQueue dispatched calls don't hit time-window limit''' + print = self._print + dsp = mq.DelayQueue( + burst_limit=self._msglimit, time_limit_ms=self._timelimit, autostart=True) + print('Started dispatcher {}\nStatus: {}' + ''.format(dsp, ['inactive', 'active'][dsp.is_alive()])) + self.assertTrue(dsp.is_alive()) + + print('Dispatching {} calls @ {}'.format(self._N, time.asctime())) + + for i in range(self._N): + dsp(self.testcall) + + print('Queue filled, waiting 4 dispatch finish @ ' + str(time.asctime())) + + starttime = mq.curtime() + app_endtime = ( + (self._N * self._msglimit / + (1000 * self._timelimit)) + starttime + 20) # wait up to 20 sec more than needed + while not dsp._queue.empty() and mq.curtime() < app_endtime: + time.sleep(1) + self.assertTrue(dsp._queue.empty()) # check loop exit condition + + dsp.stop() + print('Dispatcher ' + ['stopped', '!NOT STOPPED!'][dsp.is_alive()] + ' @ ' + str( + time.asctime())) + self.assertFalse(dsp.is_alive()) + + self.assertTrue(self.testtimes or self._N == 0) + print('Calculating call time windows') + passes, fails = [], [] + delta = (self._timelimit - self._margin) / 1000 + it = enumerate(range(self._msglimit + 1, len(self.testtimes))) + for start, stop in it: + part = self.testtimes[start:stop] + if (part[-1] - part[0]) >= delta: + passes.append(part) + else: + fails.append(part) + + print('Tested: {}, Passed: {}, Failed: {}' + ''.format(len(passes + fails), len(passes), len(fails))) + if fails: + print('(!) Got mismatches: ' + ';\n'.join(map(str, fails))) + self.assertFalse(fails) + + +if __name__ == '__main__': + unittest.main()