Pass Failing Jobs to Error Handlers (#2692)

This commit is contained in:
Bibo-Joshi 2021-10-03 20:00:54 +02:00 committed by Hinrich Mahler
parent 51488bb4be
commit 90b82eed99
5 changed files with 72 additions and 52 deletions

View file

@ -6,3 +6,4 @@ telegram.ext.Job
.. autoclass:: telegram.ext.Job
:members:
:show-inheritance:
:special-members: __call__

View file

@ -86,7 +86,11 @@ class CallbackContext(Generic[UD, CD, BD]):
that raised the error. Only present when the raising function was run asynchronously
using :meth:`telegram.ext.Dispatcher.run_async`.
job (:class:`telegram.ext.Job`): Optional. The job which originated this callback.
Only present when passed to the callback of :class:`telegram.ext.Job`.
Only present when passed to the callback of :class:`telegram.ext.Job` or in error
handlers if the error is caused by a job.
.. versionchanged:: 14.0
:attr:`job` is now also present in error handlers if the error is caused by a job.
"""
@ -231,6 +235,7 @@ class CallbackContext(Generic[UD, CD, BD]):
dispatcher: 'Dispatcher[CCT, UD, CD, BD]',
async_args: Union[List, Tuple] = None,
async_kwargs: Dict[str, object] = None,
job: 'Job' = None,
) -> 'CCT':
"""
Constructs an instance of :class:`telegram.ext.CallbackContext` to be passed to the error
@ -244,12 +249,15 @@ class CallbackContext(Generic[UD, CD, BD]):
error (:obj:`Exception`): The error.
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher associated with this
context.
async_args (List[:obj:`object`]): Optional. Positional arguments of the function that
async_args (List[:obj:`object`], optional): Positional arguments of the function that
raised the error. Pass only when the raising function was run asynchronously using
:meth:`telegram.ext.Dispatcher.run_async`.
async_kwargs (Dict[:obj:`str`, :obj:`object`]): Optional. Keyword arguments of the
async_kwargs (Dict[:obj:`str`, :obj:`object`], optional): Keyword arguments of the
function that raised the error. Pass only when the raising function was run
asynchronously using :meth:`telegram.ext.Dispatcher.run_async`.
job (:class:`telegram.ext.Job`, optional): The job associated with the error.
.. versionadded:: 14.0
Returns:
:class:`telegram.ext.CallbackContext`
@ -258,6 +266,7 @@ class CallbackContext(Generic[UD, CD, BD]):
self.error = error
self.async_args = async_args
self.async_kwargs = async_kwargs
self.job = job
return self
@classmethod

View file

@ -52,8 +52,7 @@ from telegram.ext.utils.types import CCT, UD, CD, BD
if TYPE_CHECKING:
from telegram import Bot
from telegram.ext import JobQueue
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext import JobQueue, Job, CallbackContext
DEFAULT_GROUP: int = 0
@ -678,6 +677,7 @@ class Dispatcher(Generic[CCT, UD, CD, BD]):
update: Optional[object],
error: Exception,
promise: Promise = None,
job: 'Job' = None,
) -> bool:
"""Dispatches an error by passing it to all error handlers registered with
:meth:`add_error_handler`. If one of the error handlers raises
@ -696,6 +696,9 @@ class Dispatcher(Generic[CCT, UD, CD, BD]):
error (:obj:`Exception`): The error that was raised.
promise (:class:`telegram.utils.Promise`, optional): The promise whose pooled function
raised the error.
job (:class:`telegram.ext.Job`, optional): The job that caused the error.
.. versionadded:: 14.0
Returns:
:obj:`bool`: :obj:`True` if one of the error handlers raised
@ -707,7 +710,12 @@ class Dispatcher(Generic[CCT, UD, CD, BD]):
if self.error_handlers:
for callback, run_async in self.error_handlers.items(): # pylint: disable=W0621
context = self.context_types.context.from_error(
update, error, self, async_args=async_args, async_kwargs=async_kwargs
update=update,
error=error,
dispatcher=self,
async_args=async_args,
async_kwargs=async_kwargs,
job=job,
)
if run_async:
self.run_async(callback, update, context, update=update)

View file

@ -19,20 +19,17 @@
"""This module contains the classes JobQueue and Job."""
import datetime
import logging
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union, cast, overload
from typing import TYPE_CHECKING, Callable, Optional, Tuple, Union, cast, overload
import pytz
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, JobEvent
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.job import Job as APSJob
from telegram.ext.callbackcontext import CallbackContext
from telegram.utils.types import JSONDict
from .extbot import ExtBot
if TYPE_CHECKING:
from telegram.ext import Dispatcher
from telegram.ext import Dispatcher, CallbackContext
import apscheduler.job # noqa: F401
@ -45,35 +42,15 @@ class JobQueue:
"""
__slots__ = ('_dispatcher', 'logger', 'scheduler')
__slots__ = ('_dispatcher', 'scheduler')
def __init__(self) -> None:
self._dispatcher: 'Dispatcher' = None # type: ignore[assignment]
self.logger = logging.getLogger(self.__class__.__name__)
self.scheduler = BackgroundScheduler(timezone=pytz.utc)
self.scheduler.add_listener(
self._update_persistence, mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR
)
# Dispatch errors and don't log them in the APS logger
def aps_log_filter(record): # type: ignore
return 'raised an exception' not in record.msg
logging.getLogger('apscheduler.executors.default').addFilter(aps_log_filter)
self.scheduler.add_listener(self._dispatch_error, EVENT_JOB_ERROR)
def _build_args(self, job: 'Job') -> List[CallbackContext]:
return [self._dispatcher.context_types.context.from_job(job, self._dispatcher)]
def _tz_now(self) -> datetime.datetime:
return datetime.datetime.now(self.scheduler.timezone)
def _update_persistence(self, _: JobEvent) -> None:
self._dispatcher.update_persistence()
def _dispatch_error(self, event: JobEvent) -> None:
self._dispatcher.dispatch_error(None, event.exception)
@overload
def _parse_time_input(self, time: None, shift_day: bool = False) -> None:
...
@ -170,11 +147,11 @@ class JobQueue:
date_time = self._parse_time_input(when, shift_day=True)
j = self.scheduler.add_job(
callback,
job,
name=name,
trigger='date',
run_date=date_time,
args=self._build_args(job),
args=(self._dispatcher,),
timezone=date_time.tzinfo or self.scheduler.timezone,
**job_kwargs,
)
@ -262,9 +239,9 @@ class JobQueue:
interval = interval.total_seconds()
j = self.scheduler.add_job(
callback,
job,
trigger='interval',
args=self._build_args(job),
args=(self._dispatcher,),
start_date=dt_first,
end_date=dt_last,
seconds=interval,
@ -318,9 +295,9 @@ class JobQueue:
job = Job(callback, context, name)
j = self.scheduler.add_job(
callback,
job,
trigger='cron',
args=self._build_args(job),
args=(self._dispatcher,),
name=name,
day='last' if day == -1 else day,
hour=when.hour,
@ -375,9 +352,9 @@ class JobQueue:
job = Job(callback, context, name)
j = self.scheduler.add_job(
callback,
job,
name=name,
args=self._build_args(job),
args=(self._dispatcher,),
trigger='cron',
day_of_week=','.join([str(d) for d in days]),
hour=time.hour,
@ -417,7 +394,7 @@ class JobQueue:
name = name or callback.__name__
job = Job(callback, context, name)
j = self.scheduler.add_job(callback, args=self._build_args(job), name=name, **job_kwargs)
j = self.scheduler.add_job(job, args=(self._dispatcher,), name=name, **job_kwargs)
job.job = j
return job
@ -507,11 +484,39 @@ class Job:
self.job = cast(APSJob, job) # skipcq: PTC-W0052
def run(self, dispatcher: 'Dispatcher') -> None:
"""Executes the callback function independently of the jobs schedule."""
"""Executes the callback function independently of the jobs schedule. Also calls
:meth:`telegram.ext.Dispatcher.update_persistence`.
.. versionchaged:: 14.0
Calls :meth:`telegram.ext.Dispatcher.update_persistence`.
Args:
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher this job is associated
with.
"""
try:
self.callback(dispatcher.context_types.context.from_job(self, dispatcher))
except Exception as exc:
dispatcher.dispatch_error(None, exc)
dispatcher.dispatch_error(None, exc, job=self)
finally:
dispatcher.update_persistence(None)
def __call__(self, dispatcher: 'Dispatcher') -> None:
"""Shortcut for::
job.run(dispatcher)
Warning:
The fact that jobs are callable should be considered an implementation detail and not
as part of PTBs public API.
.. versionadded:: 14.0
Args:
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher this job is associated
with.
"""
self.run(dispatcher=dispatcher)
def schedule_removal(self) -> None:
"""
@ -550,12 +555,7 @@ class Job:
@classmethod
def _from_aps_job(cls, job: APSJob) -> 'Job':
# context based callbacks
if len(job.args) == 1:
context = job.args[0].job.context
else:
context = job.args[1].context
return cls(job.func, context=context, name=job.name, job=job)
return job.func
def __getattr__(self, item: str) -> object:
return getattr(self.job, item)

View file

@ -95,7 +95,7 @@ class TestJobQueue:
self.result += 1
def error_handler_context(self, update, context):
self.received_error = str(context.error)
self.received_error = (str(context.error), context.job)
def error_handler_raise_error(self, *args):
raise Exception('Failing bigly')
@ -425,10 +425,12 @@ class TestJobQueue:
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert self.received_error == 'Test Error'
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job
self.received_error = None
job.run(dp)
assert self.received_error == 'Test Error'
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job
# Remove handler
dp.remove_error_handler(self.error_handler_context)