Make Integration of APScheduler into JobQueue More Explicit (#3695)

This commit is contained in:
Bibo-Joshi 2023-06-02 22:17:46 +02:00 committed by GitHub
parent bf54599618
commit 9c8d6efe7a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 15 deletions

View file

@ -158,6 +158,30 @@ class JobQueue(Generic[CCT]):
return application return application
raise RuntimeError("The application instance is no longer alive.") raise RuntimeError("The application instance is no longer alive.")
@staticmethod
async def job_callback(job_queue: "JobQueue[CCT]", job: "Job[CCT]") -> None:
"""This method is used as a callback for the APScheduler jobs.
More precisely, the ``func`` argument of :class:`apscheduler.job.Job` is set to this method
and the ``arg`` argument (representing positional arguments to ``func``) is set to a tuple
containing the :class:`JobQueue` itself and the :class:`~telegram.ext.Job` instance.
Tip:
This method is a static method rather than a bound method. This makes the arguments
more transparent and allows for easier handling of PTBs integration of APScheduler
when utilizing advanced features of APScheduler.
Hint:
This method is effectively a wrapper for :meth:`telegram.ext.Job.run`.
.. versionadded:: NEXT.VERSION
Args:
job_queue (:class:`JobQueue`): The job queue that created the job.
job (:class:`~telegram.ext.Job`): The job to run.
"""
await job.run(job_queue.application)
def run_once( def run_once(
self, self,
callback: JobCallback[CCT], callback: JobCallback[CCT],
@ -230,11 +254,11 @@ class JobQueue(Generic[CCT]):
date_time = self._parse_time_input(when, shift_day=True) date_time = self._parse_time_input(when, shift_day=True)
j = self.scheduler.add_job( j = self.scheduler.add_job(
job.run, self.job_callback,
name=name, name=name,
trigger="date", trigger="date",
run_date=date_time, run_date=date_time,
args=(self.application,), args=(self, job),
timezone=date_time.tzinfo or self.scheduler.timezone, timezone=date_time.tzinfo or self.scheduler.timezone,
**job_kwargs, **job_kwargs,
) )
@ -356,9 +380,9 @@ class JobQueue(Generic[CCT]):
interval = interval.total_seconds() interval = interval.total_seconds()
j = self.scheduler.add_job( j = self.scheduler.add_job(
job.run, self.job_callback,
trigger="interval", trigger="interval",
args=(self.application,), args=(self, job),
start_date=dt_first, start_date=dt_first,
end_date=dt_last, end_date=dt_last,
seconds=interval, seconds=interval,
@ -433,9 +457,9 @@ class JobQueue(Generic[CCT]):
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)
j = self.scheduler.add_job( j = self.scheduler.add_job(
job.run, self.job_callback,
trigger="cron", trigger="cron",
args=(self.application,), args=(self, job),
name=name, name=name,
day="last" if day == -1 else day, day="last" if day == -1 else day,
hour=when.hour, hour=when.hour,
@ -523,9 +547,9 @@ class JobQueue(Generic[CCT]):
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)
j = self.scheduler.add_job( j = self.scheduler.add_job(
job.run, self.job_callback,
name=name, name=name,
args=(self.application,), args=(self, job),
trigger="cron", trigger="cron",
day_of_week=",".join([self._CRON_MAPPING[d] for d in days]), day_of_week=",".join([self._CRON_MAPPING[d] for d in days]),
hour=time.hour, hour=time.hour,
@ -585,7 +609,7 @@ class JobQueue(Generic[CCT]):
name = name or callback.__name__ name = name or callback.__name__
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)
j = self.scheduler.add_job(job.run, args=(self.application,), name=name, **job_kwargs) j = self.scheduler.add_job(self.job_callback, args=(self, job), name=name, **job_kwargs)
job._job = j # pylint: disable=protected-access job._job = j # pylint: disable=protected-access
return job return job
@ -625,10 +649,7 @@ class JobQueue(Generic[CCT]):
Returns: Returns:
Tuple[:class:`Job`]: Tuple of all *scheduled* jobs. Tuple[:class:`Job`]: Tuple of all *scheduled* jobs.
""" """
return tuple( return tuple(Job.from_aps_job(job) for job in self.scheduler.get_jobs())
Job._from_aps_job(job) # pylint: disable=protected-access
for job in self.scheduler.get_jobs()
)
def get_jobs_by_name(self, name: str) -> Tuple["Job[CCT]", ...]: def get_jobs_by_name(self, name: str) -> Tuple["Job[CCT]", ...]:
"""Returns a tuple of all *pending/scheduled* jobs with the given name that are currently """Returns a tuple of all *pending/scheduled* jobs with the given name that are currently
@ -821,8 +842,25 @@ class Job(Generic[CCT]):
return self.job.next_run_time return self.job.next_run_time
@classmethod @classmethod
def _from_aps_job(cls, job: "APSJob") -> "Job[CCT]": def from_aps_job(cls, aps_job: "APSJob") -> "Job[CCT]":
return job.func.__self__ """Provides the :class:`telegram.ext.Job` that is associated with the given APScheduler
job.
Tip:
This method can be useful when using advanced APScheduler features along with
:class:`telegram.ext.JobQueue`.
.. versionadded:: NEXT.VERSION
Args:
aps_job (:class:`apscheduler.job.Job`): The APScheduler job
Returns:
:class:`telegram.ext.Job`
"""
ext_job = aps_job.args[1]
ext_job._job = aps_job # pylint: disable=protected-access
return ext_job
def __getattr__(self, item: str) -> object: def __getattr__(self, item: str) -> object:
try: try:

View file

@ -615,3 +615,29 @@ class TestJobQueue:
else: else:
await asyncio.sleep(0.1) # unfortunately we will get a CancelledError here await asyncio.sleep(0.1) # unfortunately we will get a CancelledError here
assert task.done() assert task.done()
async def test_from_aps_job(self, job_queue):
job = job_queue.run_once(self.job_run_once, 0.1, name="test_job")
aps_job = job_queue.scheduler.get_job(job.id)
tg_job = Job.from_aps_job(aps_job)
assert tg_job is job
assert tg_job.job is aps_job
async def test_from_aps_job_missing_reference(self, job_queue):
"""We manually create a ext.Job and an aps job such that the former has no reference to the
latter. Then we test that Job.from_aps_job() still sets the reference correctly.
"""
job = Job(self.job_run_once)
aps_job = job_queue.scheduler.add_job(
func=job_queue.job_callback,
args=(job_queue, job),
trigger="interval",
seconds=2,
id="test_id",
)
assert job.job is None
tg_job = Job.from_aps_job(aps_job)
assert tg_job is job
assert tg_job.job is aps_job