Start additional threads only when necessary (#415)

* Start all additional threads only when necessary.

* Deprecate prevent_autostart in the c'tor of JobQueue.
This commit is contained in:
Michael Elovskikh 2016-09-27 12:21:35 +05:00 committed by Noam Meltzer
parent 9d0e0386d9
commit 46657afa95
6 changed files with 22 additions and 15 deletions

View file

@ -21,6 +21,7 @@ The following wonderful people contributed directly or indirectly to this projec
- `jlmadurga <https://github.com/jlmadurga>`_ - `jlmadurga <https://github.com/jlmadurga>`_
- `Li-aung Yip <https://github.com/LiaungYip>`_ - `Li-aung Yip <https://github.com/LiaungYip>`_
- `macrojames <https://github.com/macrojames>`_ - `macrojames <https://github.com/macrojames>`_
- `Michael Elovskikh <https://github.com/wronglink>`_
- `naveenvhegde <https://github.com/naveenvhegde>`_ - `naveenvhegde <https://github.com/naveenvhegde>`_
- `njittam <https://github.com/njittam>`_ - `njittam <https://github.com/njittam>`_
- `Noam Meltzer <https://github.com/tsnoam>`_ - `Noam Meltzer <https://github.com/tsnoam>`_

View file

@ -84,6 +84,7 @@ class Dispatcher(object):
self.bot = bot self.bot = bot
self.update_queue = update_queue self.update_queue = update_queue
self.job_queue = job_queue self.job_queue = job_queue
self.workers = workers
self.handlers = {} self.handlers = {}
""":type: dict[int, list[Handler]""" """:type: dict[int, list[Handler]"""
@ -105,8 +106,6 @@ class Dispatcher(object):
else: else:
self._set_singleton(None) self._set_singleton(None)
self._init_async_threads(uuid4(), workers)
@classmethod @classmethod
def _reset_singleton(cls): def _reset_singleton(cls):
# NOTE: This method was added mainly for test_updater benefit and specifically pypy. Never # NOTE: This method was added mainly for test_updater benefit and specifically pypy. Never
@ -193,6 +192,7 @@ class Dispatcher(object):
self.logger.error(msg) self.logger.error(msg)
raise TelegramError(msg) raise TelegramError(msg)
self._init_async_threads(uuid4(), self.workers)
self.running = True self.running = True
self.logger.debug('Dispatcher started') self.logger.debug('Dispatcher started')

View file

@ -20,6 +20,7 @@
import logging import logging
import time import time
import warnings
from threading import Thread, Lock, Event from threading import Thread, Lock, Event
from queue import PriorityQueue, Empty from queue import PriorityQueue, Empty
@ -30,15 +31,19 @@ class JobQueue(object):
Attributes: Attributes:
queue (PriorityQueue): queue (PriorityQueue):
bot (Bot): bot (Bot):
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
automatically. Defaults to ``False``
Args: Args:
bot (Bot): The bot instance that should be passed to the jobs bot (Bot): The bot instance that should be passed to the jobs
Deprecated: 5.2
prevent_autostart (Optional[bool]): Thread does not start during initialisation.
Use `start` method instead.
""" """
def __init__(self, bot, prevent_autostart=False): def __init__(self, bot, prevent_autostart=None):
if prevent_autostart is not None:
warnings.warn("prevent_autostart is being deprecated, use `start` method instead.")
self.queue = PriorityQueue() self.queue = PriorityQueue()
self.bot = bot self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
@ -51,12 +56,8 @@ class JobQueue(object):
""":type: float""" """:type: float"""
self._running = False self._running = False
if not prevent_autostart:
self.logger.debug('Auto-starting %s', self.__class__.__name__)
self.start()
def put(self, job, next_t=None): def put(self, job, next_t=None):
"""Queue a new job. If the JobQueue is not running, it will be started. """Queue a new job.
Args: Args:
job (Job): The ``Job`` instance representing the new job job (Job): The ``Job`` instance representing the new job

View file

@ -157,6 +157,7 @@ class Updater(object):
self.running = True self.running = True
# Create & start threads # Create & start threads
self.job_queue.start()
self._init_thread(self.dispatcher.start, "dispatcher") self._init_thread(self.dispatcher.start, "dispatcher")
self._init_thread(self._start_polling, "updater", poll_interval, timeout, self._init_thread(self._start_polling, "updater", poll_interval, timeout,
network_delay, bootstrap_retries, clean) network_delay, bootstrap_retries, clean)
@ -208,6 +209,7 @@ class Updater(object):
self.running = True self.running = True
# Create & start threads # Create & start threads
self.job_queue.start()
self._init_thread(self.dispatcher.start, "dispatcher"), self._init_thread(self.dispatcher.start, "dispatcher"),
self._init_thread(self._start_webhook, "updater", listen, port, url_path, cert, self._init_thread(self._start_webhook, "updater", listen, port, url_path, cert,
key, bootstrap_retries, clean, webhook_url) key, bootstrap_retries, clean, webhook_url)

View file

@ -51,6 +51,7 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def setUp(self): def setUp(self):
self.jq = JobQueue(MockBot('jobqueue_test')) self.jq = JobQueue(MockBot('jobqueue_test'))
self.jq.start()
self.result = 0 self.result = 0
def tearDown(self): def tearDown(self):
@ -143,7 +144,6 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def test_error(self): def test_error(self):
self.jq.put(Job(self.job2, 0.1)) self.jq.put(Job(self.job2, 0.1))
self.jq.put(Job(self.job1, 0.2)) self.jq.put(Job(self.job1, 0.2))
self.jq.start()
sleep(0.5) sleep(0.5)
self.assertEqual(2, self.result) self.assertEqual(2, self.result)
@ -158,6 +158,7 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def test_inUpdater(self): def test_inUpdater(self):
u = Updater(bot="MockBot") u = Updater(bot="MockBot")
u.job_queue.start()
try: try:
u.job_queue.put(Job(self.job1, 0.5)) u.job_queue.put(Job(self.job1, 0.5))
sleep(0.75) sleep(0.75)

View file

@ -427,10 +427,12 @@ class UpdaterTest(BaseTest, unittest.TestCase):
q.put(current_thread().name) q.put(current_thread().name)
sleep(1.2) sleep(1.2)
d1 = Dispatcher(MockBot('disp1'), Queue(), workers=1) d1 = Dispatcher(MockBot('disp1'), Queue())
d2 = Dispatcher(MockBot('disp2'), Queue(), workers=1) d2 = Dispatcher(MockBot('disp2'), Queue())
q1 = Queue() q1 = Queue()
q2 = Queue() q2 = Queue()
d1._init_async_threads('test_1', workers=1)
d2._init_async_threads('test_2', workers=1)
try: try:
d1.run_async(get_dispatcher_name, q1) d1.run_async(get_dispatcher_name, q1)
@ -622,9 +624,9 @@ class UpdaterTest(BaseTest, unittest.TestCase):
def test_start_dispatcher_twice(self): def test_start_dispatcher_twice(self):
self._setup_updater('', messages=0) self._setup_updater('', messages=0)
d = self.updater.dispatcher
self.updater.start_polling(0.1) self.updater.start_polling(0.1)
d.start() sleep(0.5)
self.updater.dispatcher.start()
def test_bootstrap_retries_success(self): def test_bootstrap_retries_success(self):
retries = 3 retries = 3