diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 827001937..93b3efb15 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -258,11 +258,15 @@ class Updater: # Create & start threads self.job_queue.start() dispatcher_ready = Event() + polling_ready = Event() self._init_thread(self.dispatcher.start, "dispatcher", ready=dispatcher_ready) self._init_thread(self._start_polling, "updater", poll_interval, timeout, - read_latency, bootstrap_retries, clean, allowed_updates) + read_latency, bootstrap_retries, clean, allowed_updates, + ready=polling_ready) + self.logger.debug('Waiting for Dispatcher and polling to start') dispatcher_ready.wait() + polling_ready.wait() # Return the update queue so the main thread can insert updates return self.update_queue @@ -276,7 +280,8 @@ class Updater: clean=False, bootstrap_retries=0, webhook_url=None, - allowed_updates=None): + allowed_updates=None, + force_event_loop=False): """ Starts a small http server to listen for updates via webhook. If cert and key are not provided, the webhook will be started directly on @@ -284,6 +289,15 @@ class Updater: application. Else, the webhook will be started on https://listen:port/url_path + Note: + Due to an incompatibility of the Tornado library PTB uses for the webhook with Python + 3.8+ on Windows machines, PTB will attempt to set the event loop to + :attr:`asyncio.SelectorEventLoop` and raise an exception, if an incompatible event loop + has already been specified. See this `thread`_ for more details. To suppress the + exception, set :attr:`force_event_loop` to :obj:`True`. + + .. _thread: https://github.com/tornadoweb/tornado/issues/2608 + Args: listen (:obj:`str`, optional): IP-Address to listen on. Default ``127.0.0.1``. port (:obj:`int`, optional): Port the bot should be listening on. Default ``80``. @@ -303,6 +317,8 @@ class Updater: NAT, reverse proxy, etc. Default is derived from `listen`, `port` & `url_path`. allowed_updates (List[:obj:`str`], optional): Passed to :attr:`telegram.Bot.set_webhook`. + force_event_loop (:obj:`bool`, optional): Force using the current event loop. See above + note for details. Defaults to :obj:`False` Returns: :obj:`Queue`: The update queue that can be filled from the main thread. @@ -313,16 +329,23 @@ class Updater: self.running = True # Create & start threads + webhook_ready = Event() + dispatcher_ready = Event() self.job_queue.start() - self._init_thread(self.dispatcher.start, "dispatcher"), + self._init_thread(self.dispatcher.start, "dispatcher", dispatcher_ready) self._init_thread(self._start_webhook, "updater", listen, port, url_path, cert, - key, bootstrap_retries, clean, webhook_url, allowed_updates) + key, bootstrap_retries, clean, webhook_url, allowed_updates, + ready=webhook_ready, force_event_loop=force_event_loop) + + self.logger.debug('Waiting for Dispatcher and Webhook to start') + webhook_ready.wait() + dispatcher_ready.wait() # Return the update queue so the main thread can insert updates return self.update_queue def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries, clean, - allowed_updates): # pragma: no cover + allowed_updates, ready=None): # pragma: no cover # Thread target of thread 'updater'. Runs in background, pulls # updates from Telegram and inserts them in the update queue of the # Dispatcher. @@ -354,6 +377,9 @@ class Updater: # broadcast it self.update_queue.put(exc) + if ready is not None: + ready.set() + self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', poll_interval) @@ -410,7 +436,7 @@ class Updater: return current_interval def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, - webhook_url, allowed_updates): + webhook_url, allowed_updates, ready=None, force_event_loop=False): self.logger.debug('Updater thread started (webhook)') use_ssl = cert is not None and key is not None if not url_path.startswith('/'): @@ -448,7 +474,7 @@ class Updater: self.logger.warning("cleaning updates is not supported if " "SSL-termination happens elsewhere; skipping") - self.httpd.serve_forever() + self.httpd.serve_forever(force_event_loop=force_event_loop, ready=ready) @staticmethod def _gen_webhook_url(listen, port, url_path): diff --git a/telegram/utils/webhookhandler.py b/telegram/utils/webhookhandler.py index ccda56491..644133774 100644 --- a/telegram/utils/webhookhandler.py +++ b/telegram/utils/webhookhandler.py @@ -16,10 +16,13 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +import asyncio +import os import sys import logging from telegram import Update from threading import Lock + try: import ujson as json except ImportError: @@ -41,13 +44,17 @@ class WebhookServer: self.server_lock = Lock() self.shutdown_lock = Lock() - def serve_forever(self): + def serve_forever(self, force_event_loop=False, ready=None): with self.server_lock: - IOLoop().make_current() self.is_running = True self.logger.debug('Webhook Server started.') - self.http_server.listen(self.port, address=self.listen) + self._ensure_event_loop(force_event_loop=force_event_loop) self.loop = IOLoop.current() + self.http_server.listen(self.port, address=self.listen) + + if ready is not None: + ready.set() + self.loop.start() self.logger.debug('Webhook Server stopped.') self.is_running = False @@ -65,6 +72,42 @@ class WebhookServer: self.logger.debug('Exception happened during processing of request from %s', client_address, exc_info=True) + def _ensure_event_loop(self, force_event_loop=False): + """If there's no asyncio event loop set for the current thread - create one.""" + try: + loop = asyncio.get_event_loop() + if (not force_event_loop and os.name == 'nt' and sys.version_info >= (3, 8) + and isinstance(loop, asyncio.ProactorEventLoop)): + raise TypeError('`ProactorEventLoop` is incompatible with ' + 'Tornado. Please switch to `SelectorEventLoop`.') + except RuntimeError: + # Python 3.8 changed default asyncio event loop implementation on windows + # from SelectorEventLoop to ProactorEventLoop. At the time of this writing + # Tornado doesn't support ProactorEventLoop and suggests that end users + # change asyncio event loop policy to WindowsSelectorEventLoopPolicy. + # https://github.com/tornadoweb/tornado/issues/2608 + # To avoid changing the global event loop policy, we manually construct + # a SelectorEventLoop instance instead of using asyncio.new_event_loop(). + # Note that the fix is not applied in the main thread, as that can break + # user code in even more ways than changing the global event loop policy can, + # and because Updater always starts its webhook server in a separate thread. + # Ideally, we would want to check that Tornado actually raises the expected + # NotImplementedError, but it's not possible to cleanly recover from that + # exception in current Tornado version. + if (os.name == 'nt' + and sys.version_info >= (3, 8) + # OS+version check makes hasattr check redundant, but just to be sure + and hasattr(asyncio, 'WindowsProactorEventLoopPolicy') + and (isinstance( + asyncio.get_event_loop_policy(), + asyncio.WindowsProactorEventLoopPolicy))): # pylint: disable=E1101 + self.logger.debug( + 'Applying Tornado asyncio event loop fix for Python 3.8+ on Windows') + loop = asyncio.SelectorEventLoop() + else: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + class WebhookAppClass(tornado.web.Application): @@ -74,7 +117,7 @@ class WebhookAppClass(tornado.web.Application): handlers = [ (r"{}/?".format(webhook_path), WebhookHandler, self.shared_objects) - ] # noqa + ] # noqa tornado.web.Application.__init__(self, handlers) def log_request(self, handler): @@ -88,35 +131,6 @@ class WebhookHandler(tornado.web.RequestHandler): def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.logger = logging.getLogger(__name__) - self._init_asyncio_patch() - - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - TODO: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - Copied from https://github.com/ipython/ipykernel/pull/456/ - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8): - import asyncio - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if isinstance(asyncio.get_event_loop_policy(), WindowsProactorEventLoopPolicy): - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) def initialize(self, bot, update_queue, default_quote=None): self.bot = bot diff --git a/tests/test_updater.py b/tests/test_updater.py index 7eb722ff9..832d88be2 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -16,11 +16,14 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +import asyncio import logging import os import signal import sys -import asyncio +import threading +from contextlib import contextmanager + from flaky import flaky from functools import partial from queue import Queue @@ -36,37 +39,28 @@ import pytest from telegram import TelegramError, Message, User, Chat, Update, Bot from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter from telegram.ext import Updater, Dispatcher, DictPersistence +from telegram.utils.webhookhandler import WebhookServer signalskip = pytest.mark.skipif(sys.platform == 'win32', reason='Can\'t send signals without stopping ' 'whole process on windows') -if sys.platform.startswith("win") and sys.version_info >= (3, 8): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - TODO: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - Copied from https://github.com/ipython/ipykernel/pull/456/ - """ - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) +ASYNCIO_LOCK = threading.Lock() + + +@contextmanager +def set_asyncio_event_loop(loop): + with ASYNCIO_LOCK: + try: + orig_lop = asyncio.get_event_loop() + except RuntimeError: + orig_lop = None + asyncio.set_event_loop(loop) + try: + yield + finally: + asyncio.set_event_loop(orig_lop) class TestUpdater: @@ -203,6 +197,106 @@ class TestUpdater: assert not updater.httpd.is_running updater.stop() + def test_start_webhook_no_warning_or_error_logs(self, caplog, updater, monkeypatch): + monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) + monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) + # prevent api calls from @info decorator when updater.bot.id is used in thread names + monkeypatch.setattr(updater.bot, 'bot', User(id=123, first_name='bot', is_bot=True)) + monkeypatch.setattr(updater.bot, '_commands', []) + + ip = '127.0.0.1' + port = randrange(1024, 49152) # Select random port + with caplog.at_level(logging.WARNING): + updater.start_webhook(ip, port) + updater.stop() + assert not caplog.records + + @pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8), + reason='Workaround only relevant on windows with py3.8+') + def test_start_webhook_ensure_event_loop(self, updater, monkeypatch): + def serve_forever(self, force_event_loop=False, ready=None): + with self.server_lock: + self.is_running = True + self._ensure_event_loop(force_event_loop=force_event_loop) + + if ready is not None: + ready.set() + + monkeypatch.setattr(WebhookServer, 'serve_forever', serve_forever) + monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) + monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) + + ip = '127.0.0.1' + port = randrange(1024, 49152) # Select random port + + with set_asyncio_event_loop(None): + updater._start_webhook( + ip, + port, + url_path='TOKEN', + cert=None, + key=None, + bootstrap_retries=0, + clean=False, + webhook_url=None, + allowed_updates=None) + + assert isinstance(asyncio.get_event_loop(), asyncio.SelectorEventLoop) + + @pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8), + reason='Workaround only relevant on windows with py3.8+') + def test_start_webhook_force_event_loop_false(self, updater, monkeypatch): + monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) + monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) + + ip = '127.0.0.1' + port = randrange(1024, 49152) # Select random port + + with set_asyncio_event_loop(asyncio.ProactorEventLoop()): + with pytest.raises(TypeError, match='`ProactorEventLoop` is incompatible'): + updater._start_webhook( + ip, + port, + url_path='TOKEN', + cert=None, + key=None, + bootstrap_retries=0, + clean=False, + webhook_url=None, + allowed_updates=None) + + @pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8), + reason='Workaround only relevant on windows with py3.8+') + def test_start_webhook_force_event_loop_true(self, updater, monkeypatch): + def serve_forever(self, force_event_loop=False, ready=None): + with self.server_lock: + self.is_running = True + self._ensure_event_loop(force_event_loop=force_event_loop) + + if ready is not None: + ready.set() + + monkeypatch.setattr(WebhookServer, 'serve_forever', serve_forever) + monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) + monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) + + ip = '127.0.0.1' + port = randrange(1024, 49152) # Select random port + + with set_asyncio_event_loop(asyncio.ProactorEventLoop()): + updater._start_webhook( + ip, + port, + url_path='TOKEN', + cert=None, + key=None, + bootstrap_retries=0, + clean=False, + webhook_url=None, + allowed_updates=None, + force_event_loop=True) + assert isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop) + def test_webhook_ssl(self, monkeypatch, updater): monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) @@ -267,33 +361,6 @@ class TestUpdater: assert q.get(False).message.default_quote is True updater.stop() - @pytest.mark.skipif(not (sys.platform.startswith("win") and sys.version_info >= (3, 8)), - reason="only relevant on win with py>=3.8") - def test_webhook_tornado_win_py38_workaround(self, updater, monkeypatch): - updater._default_quote = True - q = Queue() - monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) - monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) - monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u)) - - ip = '127.0.0.1' - port = randrange(1024, 49152) # Select random port - updater.start_webhook( - ip, - port, - url_path='TOKEN') - sleep(.2) - - try: - from asyncio import (WindowsSelectorEventLoopPolicy) - except ImportError: - pass - # not affected - else: - assert isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy) - - updater.stop() - @pytest.mark.parametrize(('error',), argvalues=[(TelegramError(''),)], ids=('TelegramError',))