From d1778d3973d4f94cd0d8629e75d7ad90a55beba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Schmidt=20Cristian=20Hern=C3=A1n?= Date: Wed, 19 Feb 2020 04:36:06 -0300 Subject: [PATCH] Change the functions for load/save jobs (more safe for multi-threading) --- Code-snippets.md | 80 +++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/Code-snippets.md b/Code-snippets.md index cf2f5d4..a067879 100644 --- a/Code-snippets.md +++ b/Code-snippets.md @@ -575,74 +575,70 @@ if __name__ == '__main__': Version 12 and up includes tools for [making your bot persistent](https://github.com/python-telegram-bot/python-telegram-bot/wiki/Making-your-bot-persistent). -#### Save and load jobs using pickle -The following snippet pickles the jobs in the job queue periodically and on bot shutdown and unpickles and queues them again on startup. Since `pickle` doesn't support threading primitives, they are converted. +#### Save and load jobs using pickle +The following snippet pickles the jobs in the job queue periodically and on bot shutdown, and unpickles and queues them again on startup. Since pickle doesn't support threading primitives, therefore their values and states are extracted (this information may change in the future, always check the `Job` documentation). -**Note:** Race condition for asynchronous jobs that use `job.job_queue`, `job.removed`, `job.schedule_removal` or `job.enabled` while the job is being pickled. +**Note:** `Job` is not yet safe for threads so eventually some special condition may occur. In a previous example, the content of `Job` was modified which resulted in some asynchronous processing errors; now the content of `Job` is extracted without modifying it which is much more safe. ```python import pickle -from threading import Event from time import time from datetime import timedelta +from telegram.ext import Updater, Job + JOBS_PICKLE = 'job_tuples.pickle' +# WARNING: This information may change in future versions (changes are planned) +JOB_DATA = ('callback', 'interval', 'repeat', 'context', 'days', 'name', 'tzinfo') +JOB_STATE = ('_remove', '_enabled') + def load_jobs(jq): - now = time() - with open(JOBS_PICKLE, 'rb') as fp: while True: try: - next_t, job = pickle.load(fp) + next_t, data, state = pickle.load(fp) except EOFError: - break # Loaded all job tuples + break # loaded all jobs - # Create threading primitives - enabled = job._enabled - removed = job._remove + # New object with the same data + job = Job(**{var: val for var, val in zip(JOB_DATA, data)}) - job._enabled = Event() - job._remove = Event() + # Restore the state it had + for var, val in zip(JOB_STATE, state): + attribute = getattr(job, var) + getattr(attribute, 'set' if val else 'clear')() - if enabled: - job._enabled.set() + job.job_queue = jq - if removed: - job._remove.set() - - next_t -= now # Convert from absolute to relative time + next_t -= time() # convert from absolute to relative time jq._put(job, next_t) def save_jobs(jq): - if jq: - job_tuples = jq._queue.queue - else: - job_tuples = [] + with jq._queue.mutex: # in case job_queue makes a change - with open(JOBS_PICKLE, 'wb') as fp: - for next_t, job in job_tuples: - # Back up objects - _job_queue = job._job_queue - _remove = job._remove - _enabled = job._enabled + if jq: + job_tuples = jq._queue.queue + else: + job_tuples = [] - # Replace un-pickleable threading primitives - job._job_queue = None # Will be reset in jq.put - job._remove = job.removed # Convert to boolean - job._enabled = job.enabled # Convert to boolean + with open(JOBS_PICKLE, 'wb') as fp: + for next_t, job in job_tuples: - # Pickle the job - pickle.dump((next_t, job), fp) + # This job is always created at the start + if job.name == 'save_jobs_job': + continue - # Restore objects - job._job_queue = _job_queue - job._remove = _remove - job._enabled = _enabled + # Threading primitives are not pickleable + data = tuple(getattr(job, var) for var in JOB_DATA) + state = tuple(getattr(job, var).is_set() for var in JOB_STATE) + + # Pickle the job + pickle.dump((next_t, data, state), fp) def save_jobs_job(context): @@ -650,7 +646,7 @@ def save_jobs_job(context): def main(): - # updater = Updater(..) + # updater = Updater(...) job_queue = updater.job_queue @@ -667,12 +663,14 @@ def main(): # updater.start_[polling|webhook]() # updater.idle() - # Run again after bot has been properly shut down + # Run again after bot has been properly shutdown save_jobs(job_queue) + if __name__ == '__main__': main() ``` + #### An (good) error handler The following snippet is an example of an error handler. It notifies the user when an error happens and notifies the dev(s) of the error, including the traceback and where it happend. The comments in the code try to explain exactly what happens when and why, so editing it to fit your special needs should be a breeze.