Change the functions for load/save jobs (more safe for multi-threading)

Schmidt Cristian Hernán 2020-02-19 04:36:06 -03:00
parent 76a1180c89
commit d1778d3973

@ -575,74 +575,70 @@ if __name__ == '__main__':
Version 12 and up includes tools for [making your bot persistent](https://github.com/python-telegram-bot/python-telegram-bot/wiki/Making-your-bot-persistent).
#### Save and load jobs using pickle
The following snippet pickles the jobs in the job queue periodically and on bot shutdown and unpickles and queues them again on startup. Since `pickle` doesn't support threading primitives, they are converted.
#### Save and load jobs using pickle
The following snippet pickles the jobs in the job queue periodically and on bot shutdown, and unpickles and queues them again on startup. Since pickle doesn't support threading primitives, therefore their values and states are extracted (this information may change in the future, always check the `Job` documentation).
**Note:** Race condition for asynchronous jobs that use `job.job_queue`, `job.removed`, `job.schedule_removal` or `job.enabled` while the job is being pickled.
**Note:** `Job` is not yet safe for threads so eventually some special condition may occur. In a previous example, the content of `Job` was modified which resulted in some asynchronous processing errors; now the content of `Job` is extracted without modifying it which is much more safe.
```python
import pickle
from threading import Event
from time import time
from datetime import timedelta
from telegram.ext import Updater, Job
JOBS_PICKLE = 'job_tuples.pickle'
# WARNING: This information may change in future versions (changes are planned)
JOB_DATA = ('callback', 'interval', 'repeat', 'context', 'days', 'name', 'tzinfo')
JOB_STATE = ('_remove', '_enabled')
def load_jobs(jq):
now = time()
with open(JOBS_PICKLE, 'rb') as fp:
while True:
try:
next_t, job = pickle.load(fp)
next_t, data, state = pickle.load(fp)
except EOFError:
break # Loaded all job tuples
break # loaded all jobs
# Create threading primitives
enabled = job._enabled
removed = job._remove
# New object with the same data
job = Job(**{var: val for var, val in zip(JOB_DATA, data)})
job._enabled = Event()
job._remove = Event()
# Restore the state it had
for var, val in zip(JOB_STATE, state):
attribute = getattr(job, var)
getattr(attribute, 'set' if val else 'clear')()
if enabled:
job._enabled.set()
job.job_queue = jq
if removed:
job._remove.set()
next_t -= now # Convert from absolute to relative time
next_t -= time() # convert from absolute to relative time
jq._put(job, next_t)
def save_jobs(jq):
if jq:
job_tuples = jq._queue.queue
else:
job_tuples = []
with jq._queue.mutex: # in case job_queue makes a change
with open(JOBS_PICKLE, 'wb') as fp:
for next_t, job in job_tuples:
# Back up objects
_job_queue = job._job_queue
_remove = job._remove
_enabled = job._enabled
if jq:
job_tuples = jq._queue.queue
else:
job_tuples = []
# Replace un-pickleable threading primitives
job._job_queue = None # Will be reset in jq.put
job._remove = job.removed # Convert to boolean
job._enabled = job.enabled # Convert to boolean
with open(JOBS_PICKLE, 'wb') as fp:
for next_t, job in job_tuples:
# Pickle the job
pickle.dump((next_t, job), fp)
# This job is always created at the start
if job.name == 'save_jobs_job':
continue
# Restore objects
job._job_queue = _job_queue
job._remove = _remove
job._enabled = _enabled
# Threading primitives are not pickleable
data = tuple(getattr(job, var) for var in JOB_DATA)
state = tuple(getattr(job, var).is_set() for var in JOB_STATE)
# Pickle the job
pickle.dump((next_t, data, state), fp)
def save_jobs_job(context):
@ -650,7 +646,7 @@ def save_jobs_job(context):
def main():
# updater = Updater(..)
# updater = Updater(...)
job_queue = updater.job_queue
@ -667,12 +663,14 @@ def main():
# updater.start_[polling|webhook]()
# updater.idle()
# Run again after bot has been properly shut down
# Run again after bot has been properly shutdown
save_jobs(job_queue)
if __name__ == '__main__':
main()
```
#### An (good) error handler
The following snippet is an example of an error handler. It notifies the user when an error happens and notifies the dev(s) of the error, including the traceback and where it happend. The comments in the code try to explain exactly what happens when and why, so editing it to fit your special needs should be a breeze.