jobqueue: Thread safety fixes (#977)

- Fix JobQueue.jobs to obtain a lock on the internal queue object prior
   to iterating over it.

 - Rename JobQueue.queue to JobQueue._queue. This shouldn't be
   accessible by the user directly, but rather only with sanitized
   thread safe methods.

 - JobQueue.interval_seconds - access self.interval only once to avoid
   race conditions.

Fixes #968
This commit is contained in:
Noam Meltzer 2018-01-20 15:27:01 +02:00 committed by GitHub
parent 820f4e1d59
commit ddf3a1fcad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 22 deletions

View file

@ -38,7 +38,7 @@ class JobQueue(object):
"""This class allows you to periodically perform tasks with the bot.
Attributes:
queue (:obj:`PriorityQueue`): The queue that holds the Jobs.
_queue (:obj:`PriorityQueue`): The queue that holds the Jobs.
bot (:class:`telegram.Bot`): Bot that's send to the handlers.
Args:
@ -54,7 +54,7 @@ class JobQueue(object):
if prevent_autostart is not None:
warnings.warn("prevent_autostart is being deprecated, use `start` method instead.")
self.queue = PriorityQueue()
self._queue = PriorityQueue()
self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
@ -88,7 +88,6 @@ class JobQueue(object):
tomorrow.
"""
warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.run_once', "
"'JobQueue.run_daily' or 'JobQueue.run_repeating' instead")
if job.job_queue is None:
@ -119,7 +118,7 @@ class JobQueue(object):
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
self.queue.put((next_t, job))
self._queue.put((next_t, job))
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
@ -196,7 +195,6 @@ class JobQueue(object):
queue.
"""
job = Job(callback,
interval=interval,
repeat=True,
@ -227,7 +225,6 @@ class JobQueue(object):
queue.
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
@ -250,14 +247,13 @@ class JobQueue(object):
def tick(self):
"""Run all jobs that are due and re-enqueue them with their interval."""
now = time.time()
self.logger.debug('Ticking jobs with t=%f', now)
while True:
try:
t, job = self.queue.get(False)
t, job = self._queue.get(False)
except Empty:
break
@ -270,7 +266,7 @@ class JobQueue(object):
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._queue.put((t, job))
self._set_next_peek(t)
break
@ -298,7 +294,6 @@ class JobQueue(object):
def start(self):
"""Starts the job_queue thread."""
self.__start_lock.acquire()
if not self._running:
@ -335,7 +330,6 @@ class JobQueue(object):
def stop(self):
"""Stops the thread."""
with self.__start_lock:
self._running = False
@ -345,8 +339,8 @@ class JobQueue(object):
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``."""
return tuple(job[1] for job in self.queue.queue if job)
with self._queue.mutex:
return tuple(job[1] for job in self._queue.queue if job)
class Job(object):
@ -407,7 +401,6 @@ class Job(object):
def run(self, bot):
"""Executes the callback function."""
self.callback(bot, self)
def schedule_removal(self):
@ -416,7 +409,6 @@ class Job(object):
its callback function again.
"""
self._remove.set()
@property
@ -459,10 +451,11 @@ class Job(object):
@property
def interval_seconds(self):
""":obj:`int`: The interval for this job in seconds."""
if isinstance(self.interval, datetime.timedelta):
return self.interval.total_seconds()
interval = self.interval
if isinstance(interval, datetime.timedelta):
return interval.total_seconds()
else:
return self.interval
return interval
@property
def repeat(self):
@ -478,7 +471,6 @@ class Job(object):
@property
def days(self):
"""Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run."""
return self._days
@days.setter
@ -498,7 +490,6 @@ class Job(object):
@property
def job_queue(self):
""":class:`telegram.ext.JobQueue`: Optional. The ``JobQueue`` this job belongs to."""
return self._job_queue
@job_queue.setter

View file

@ -202,7 +202,7 @@ class TestJobQueue(object):
expected_time = time.time() + delta + 60 * 60 * 24
job_queue.run_once(self.job_datetime_tests, when)
assert pytest.approx(job_queue.queue.get(False)[0]) == expected_time
assert pytest.approx(job_queue._queue.get(False)[0]) == expected_time
def test_run_daily(self, job_queue):
delta = 0.5
@ -212,7 +212,7 @@ class TestJobQueue(object):
job_queue.run_daily(self.job_run_once, time_of_day)
sleep(0.6)
assert self.result == 1
assert pytest.approx(job_queue.queue.get(False)[0]) == expected_time
assert pytest.approx(job_queue._queue.get(False)[0]) == expected_time
def test_warnings(self, job_queue):
j = Job(self.job_run_once, repeat=False)