From ddf3a1fcad56aebe6ae9c40691b2a627d3b39168 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 20 Jan 2018 15:27:01 +0200 Subject: [PATCH] jobqueue: Thread safety fixes (#977) - Fix JobQueue.jobs to obtain a lock on the internal queue object prior to iterating over it. - Rename JobQueue.queue to JobQueue._queue. This shouldn't be accessible by the user directly, but rather only with sanitized thread safe methods. - JobQueue.interval_seconds - access self.interval only once to avoid race conditions. Fixes #968 --- telegram/ext/jobqueue.py | 31 +++++++++++-------------------- tests/test_jobqueue.py | 4 ++-- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index d2085639b..24b98a2e1 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -38,7 +38,7 @@ class JobQueue(object): """This class allows you to periodically perform tasks with the bot. Attributes: - queue (:obj:`PriorityQueue`): The queue that holds the Jobs. + _queue (:obj:`PriorityQueue`): The queue that holds the Jobs. bot (:class:`telegram.Bot`): Bot that's send to the handlers. Args: @@ -54,7 +54,7 @@ class JobQueue(object): if prevent_autostart is not None: warnings.warn("prevent_autostart is being deprecated, use `start` method instead.") - self.queue = PriorityQueue() + self._queue = PriorityQueue() self.bot = bot self.logger = logging.getLogger(self.__class__.__name__) self.__start_lock = Lock() @@ -88,7 +88,6 @@ class JobQueue(object): tomorrow. """ - warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.run_once', " "'JobQueue.run_daily' or 'JobQueue.run_repeating' instead") if job.job_queue is None: @@ -119,7 +118,7 @@ class JobQueue(object): self.logger.debug('Putting job %s with t=%f', job.name, next_t) - self.queue.put((next_t, job)) + self._queue.put((next_t, job)) # Wake up the loop if this job should be executed next self._set_next_peek(next_t) @@ -196,7 +195,6 @@ class JobQueue(object): queue. """ - job = Job(callback, interval=interval, repeat=True, @@ -227,7 +225,6 @@ class JobQueue(object): queue. """ - job = Job(callback, interval=datetime.timedelta(days=1), repeat=True, @@ -250,14 +247,13 @@ class JobQueue(object): def tick(self): """Run all jobs that are due and re-enqueue them with their interval.""" - now = time.time() self.logger.debug('Ticking jobs with t=%f', now) while True: try: - t, job = self.queue.get(False) + t, job = self._queue.get(False) except Empty: break @@ -270,7 +266,7 @@ class JobQueue(object): # 2. At the first iteration of the loop only if `self.put()` had triggered # `self.__tick` because `self._next_peek` wasn't set self.logger.debug("Next task isn't due yet. Finished!") - self.queue.put((t, job)) + self._queue.put((t, job)) self._set_next_peek(t) break @@ -298,7 +294,6 @@ class JobQueue(object): def start(self): """Starts the job_queue thread.""" - self.__start_lock.acquire() if not self._running: @@ -335,7 +330,6 @@ class JobQueue(object): def stop(self): """Stops the thread.""" - with self.__start_lock: self._running = False @@ -345,8 +339,8 @@ class JobQueue(object): def jobs(self): """Returns a tuple of all jobs that are currently in the ``JobQueue``.""" - - return tuple(job[1] for job in self.queue.queue if job) + with self._queue.mutex: + return tuple(job[1] for job in self._queue.queue if job) class Job(object): @@ -407,7 +401,6 @@ class Job(object): def run(self, bot): """Executes the callback function.""" - self.callback(bot, self) def schedule_removal(self): @@ -416,7 +409,6 @@ class Job(object): its callback function again. """ - self._remove.set() @property @@ -459,10 +451,11 @@ class Job(object): @property def interval_seconds(self): """:obj:`int`: The interval for this job in seconds.""" - if isinstance(self.interval, datetime.timedelta): - return self.interval.total_seconds() + interval = self.interval + if isinstance(interval, datetime.timedelta): + return interval.total_seconds() else: - return self.interval + return interval @property def repeat(self): @@ -478,7 +471,6 @@ class Job(object): @property def days(self): """Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run.""" - return self._days @days.setter @@ -498,7 +490,6 @@ class Job(object): @property def job_queue(self): """:class:`telegram.ext.JobQueue`: Optional. The ``JobQueue`` this job belongs to.""" - return self._job_queue @job_queue.setter diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index ae859e501..1b526151b 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -202,7 +202,7 @@ class TestJobQueue(object): expected_time = time.time() + delta + 60 * 60 * 24 job_queue.run_once(self.job_datetime_tests, when) - assert pytest.approx(job_queue.queue.get(False)[0]) == expected_time + assert pytest.approx(job_queue._queue.get(False)[0]) == expected_time def test_run_daily(self, job_queue): delta = 0.5 @@ -212,7 +212,7 @@ class TestJobQueue(object): job_queue.run_daily(self.job_run_once, time_of_day) sleep(0.6) assert self.result == 1 - assert pytest.approx(job_queue.queue.get(False)[0]) == expected_time + assert pytest.approx(job_queue._queue.get(False)[0]) == expected_time def test_warnings(self, job_queue): j = Job(self.job_run_once, repeat=False)