Fix Webhook not working on Windows with Python 3.8+ (#2067)

* Fix start_webhook NotImplementedError on windows with python3.8

* Fine-tune and add tests.

* Minor fixes

* typos

* Make Codacy happy

Co-authored-by: n5y <41209360+n5y@users.noreply.github.com>
This commit is contained in:
Bibo-Joshi 2020-08-25 22:21:24 +02:00 committed by GitHub
parent a0720b9ac6
commit bb34c79909
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 200 additions and 93 deletions

View file

@ -258,11 +258,15 @@ class Updater:
# Create & start threads # Create & start threads
self.job_queue.start() self.job_queue.start()
dispatcher_ready = Event() dispatcher_ready = Event()
polling_ready = Event()
self._init_thread(self.dispatcher.start, "dispatcher", ready=dispatcher_ready) self._init_thread(self.dispatcher.start, "dispatcher", ready=dispatcher_ready)
self._init_thread(self._start_polling, "updater", poll_interval, timeout, self._init_thread(self._start_polling, "updater", poll_interval, timeout,
read_latency, bootstrap_retries, clean, allowed_updates) read_latency, bootstrap_retries, clean, allowed_updates,
ready=polling_ready)
self.logger.debug('Waiting for Dispatcher and polling to start')
dispatcher_ready.wait() dispatcher_ready.wait()
polling_ready.wait()
# Return the update queue so the main thread can insert updates # Return the update queue so the main thread can insert updates
return self.update_queue return self.update_queue
@ -276,7 +280,8 @@ class Updater:
clean=False, clean=False,
bootstrap_retries=0, bootstrap_retries=0,
webhook_url=None, webhook_url=None,
allowed_updates=None): allowed_updates=None,
force_event_loop=False):
""" """
Starts a small http server to listen for updates via webhook. If cert Starts a small http server to listen for updates via webhook. If cert
and key are not provided, the webhook will be started directly on and key are not provided, the webhook will be started directly on
@ -284,6 +289,15 @@ class Updater:
application. Else, the webhook will be started on application. Else, the webhook will be started on
https://listen:port/url_path https://listen:port/url_path
Note:
Due to an incompatibility of the Tornado library PTB uses for the webhook with Python
3.8+ on Windows machines, PTB will attempt to set the event loop to
:attr:`asyncio.SelectorEventLoop` and raise an exception, if an incompatible event loop
has already been specified. See this `thread`_ for more details. To suppress the
exception, set :attr:`force_event_loop` to :obj:`True`.
.. _thread: https://github.com/tornadoweb/tornado/issues/2608
Args: Args:
listen (:obj:`str`, optional): IP-Address to listen on. Default ``127.0.0.1``. listen (:obj:`str`, optional): IP-Address to listen on. Default ``127.0.0.1``.
port (:obj:`int`, optional): Port the bot should be listening on. Default ``80``. port (:obj:`int`, optional): Port the bot should be listening on. Default ``80``.
@ -303,6 +317,8 @@ class Updater:
NAT, reverse proxy, etc. Default is derived from `listen`, `port` & `url_path`. NAT, reverse proxy, etc. Default is derived from `listen`, `port` & `url_path`.
allowed_updates (List[:obj:`str`], optional): Passed to allowed_updates (List[:obj:`str`], optional): Passed to
:attr:`telegram.Bot.set_webhook`. :attr:`telegram.Bot.set_webhook`.
force_event_loop (:obj:`bool`, optional): Force using the current event loop. See above
note for details. Defaults to :obj:`False`
Returns: Returns:
:obj:`Queue`: The update queue that can be filled from the main thread. :obj:`Queue`: The update queue that can be filled from the main thread.
@ -313,16 +329,23 @@ class Updater:
self.running = True self.running = True
# Create & start threads # Create & start threads
webhook_ready = Event()
dispatcher_ready = Event()
self.job_queue.start() self.job_queue.start()
self._init_thread(self.dispatcher.start, "dispatcher"), self._init_thread(self.dispatcher.start, "dispatcher", dispatcher_ready)
self._init_thread(self._start_webhook, "updater", listen, port, url_path, cert, self._init_thread(self._start_webhook, "updater", listen, port, url_path, cert,
key, bootstrap_retries, clean, webhook_url, allowed_updates) key, bootstrap_retries, clean, webhook_url, allowed_updates,
ready=webhook_ready, force_event_loop=force_event_loop)
self.logger.debug('Waiting for Dispatcher and Webhook to start')
webhook_ready.wait()
dispatcher_ready.wait()
# Return the update queue so the main thread can insert updates # Return the update queue so the main thread can insert updates
return self.update_queue return self.update_queue
def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries, clean, def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries, clean,
allowed_updates): # pragma: no cover allowed_updates, ready=None): # pragma: no cover
# Thread target of thread 'updater'. Runs in background, pulls # Thread target of thread 'updater'. Runs in background, pulls
# updates from Telegram and inserts them in the update queue of the # updates from Telegram and inserts them in the update queue of the
# Dispatcher. # Dispatcher.
@ -354,6 +377,9 @@ class Updater:
# broadcast it # broadcast it
self.update_queue.put(exc) self.update_queue.put(exc)
if ready is not None:
ready.set()
self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates',
poll_interval) poll_interval)
@ -410,7 +436,7 @@ class Updater:
return current_interval return current_interval
def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean,
webhook_url, allowed_updates): webhook_url, allowed_updates, ready=None, force_event_loop=False):
self.logger.debug('Updater thread started (webhook)') self.logger.debug('Updater thread started (webhook)')
use_ssl = cert is not None and key is not None use_ssl = cert is not None and key is not None
if not url_path.startswith('/'): if not url_path.startswith('/'):
@ -448,7 +474,7 @@ class Updater:
self.logger.warning("cleaning updates is not supported if " self.logger.warning("cleaning updates is not supported if "
"SSL-termination happens elsewhere; skipping") "SSL-termination happens elsewhere; skipping")
self.httpd.serve_forever() self.httpd.serve_forever(force_event_loop=force_event_loop, ready=ready)
@staticmethod @staticmethod
def _gen_webhook_url(listen, port, url_path): def _gen_webhook_url(listen, port, url_path):

View file

@ -16,10 +16,13 @@
# #
# You should have received a copy of the GNU Lesser Public License # You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/]. # along with this program. If not, see [http://www.gnu.org/licenses/].
import asyncio
import os
import sys import sys
import logging import logging
from telegram import Update from telegram import Update
from threading import Lock from threading import Lock
try: try:
import ujson as json import ujson as json
except ImportError: except ImportError:
@ -41,13 +44,17 @@ class WebhookServer:
self.server_lock = Lock() self.server_lock = Lock()
self.shutdown_lock = Lock() self.shutdown_lock = Lock()
def serve_forever(self): def serve_forever(self, force_event_loop=False, ready=None):
with self.server_lock: with self.server_lock:
IOLoop().make_current()
self.is_running = True self.is_running = True
self.logger.debug('Webhook Server started.') self.logger.debug('Webhook Server started.')
self.http_server.listen(self.port, address=self.listen) self._ensure_event_loop(force_event_loop=force_event_loop)
self.loop = IOLoop.current() self.loop = IOLoop.current()
self.http_server.listen(self.port, address=self.listen)
if ready is not None:
ready.set()
self.loop.start() self.loop.start()
self.logger.debug('Webhook Server stopped.') self.logger.debug('Webhook Server stopped.')
self.is_running = False self.is_running = False
@ -65,6 +72,42 @@ class WebhookServer:
self.logger.debug('Exception happened during processing of request from %s', self.logger.debug('Exception happened during processing of request from %s',
client_address, exc_info=True) client_address, exc_info=True)
def _ensure_event_loop(self, force_event_loop=False):
"""If there's no asyncio event loop set for the current thread - create one."""
try:
loop = asyncio.get_event_loop()
if (not force_event_loop and os.name == 'nt' and sys.version_info >= (3, 8)
and isinstance(loop, asyncio.ProactorEventLoop)):
raise TypeError('`ProactorEventLoop` is incompatible with '
'Tornado. Please switch to `SelectorEventLoop`.')
except RuntimeError:
# Python 3.8 changed default asyncio event loop implementation on windows
# from SelectorEventLoop to ProactorEventLoop. At the time of this writing
# Tornado doesn't support ProactorEventLoop and suggests that end users
# change asyncio event loop policy to WindowsSelectorEventLoopPolicy.
# https://github.com/tornadoweb/tornado/issues/2608
# To avoid changing the global event loop policy, we manually construct
# a SelectorEventLoop instance instead of using asyncio.new_event_loop().
# Note that the fix is not applied in the main thread, as that can break
# user code in even more ways than changing the global event loop policy can,
# and because Updater always starts its webhook server in a separate thread.
# Ideally, we would want to check that Tornado actually raises the expected
# NotImplementedError, but it's not possible to cleanly recover from that
# exception in current Tornado version.
if (os.name == 'nt'
and sys.version_info >= (3, 8)
# OS+version check makes hasattr check redundant, but just to be sure
and hasattr(asyncio, 'WindowsProactorEventLoopPolicy')
and (isinstance(
asyncio.get_event_loop_policy(),
asyncio.WindowsProactorEventLoopPolicy))): # pylint: disable=E1101
self.logger.debug(
'Applying Tornado asyncio event loop fix for Python 3.8+ on Windows')
loop = asyncio.SelectorEventLoop()
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class WebhookAppClass(tornado.web.Application): class WebhookAppClass(tornado.web.Application):
@ -74,7 +117,7 @@ class WebhookAppClass(tornado.web.Application):
handlers = [ handlers = [
(r"{}/?".format(webhook_path), WebhookHandler, (r"{}/?".format(webhook_path), WebhookHandler,
self.shared_objects) self.shared_objects)
] # noqa ] # noqa
tornado.web.Application.__init__(self, handlers) tornado.web.Application.__init__(self, handlers)
def log_request(self, handler): def log_request(self, handler):
@ -88,35 +131,6 @@ class WebhookHandler(tornado.web.RequestHandler):
def __init__(self, application, request, **kwargs): def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs) super().__init__(application, request, **kwargs)
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._init_asyncio_patch()
def _init_asyncio_patch(self):
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
TODO: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
Copied from https://github.com/ipython/ipykernel/pull/456/
"""
if sys.platform.startswith("win") and sys.version_info >= (3, 8):
import asyncio
try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if isinstance(asyncio.get_event_loop_policy(), WindowsProactorEventLoopPolicy):
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
def initialize(self, bot, update_queue, default_quote=None): def initialize(self, bot, update_queue, default_quote=None):
self.bot = bot self.bot = bot

View file

@ -16,11 +16,14 @@
# #
# You should have received a copy of the GNU Lesser Public License # You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/]. # along with this program. If not, see [http://www.gnu.org/licenses/].
import asyncio
import logging import logging
import os import os
import signal import signal
import sys import sys
import asyncio import threading
from contextlib import contextmanager
from flaky import flaky from flaky import flaky
from functools import partial from functools import partial
from queue import Queue from queue import Queue
@ -36,37 +39,28 @@ import pytest
from telegram import TelegramError, Message, User, Chat, Update, Bot from telegram import TelegramError, Message, User, Chat, Update, Bot
from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter
from telegram.ext import Updater, Dispatcher, DictPersistence from telegram.ext import Updater, Dispatcher, DictPersistence
from telegram.utils.webhookhandler import WebhookServer
signalskip = pytest.mark.skipif(sys.platform == 'win32', signalskip = pytest.mark.skipif(sys.platform == 'win32',
reason='Can\'t send signals without stopping ' reason='Can\'t send signals without stopping '
'whole process on windows') 'whole process on windows')
if sys.platform.startswith("win") and sys.version_info >= (3, 8): ASYNCIO_LOCK = threading.Lock()
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows @contextmanager
Pick the older SelectorEventLoopPolicy on Windows def set_asyncio_event_loop(loop):
if the known-incompatible default policy is in use. with ASYNCIO_LOCK:
do this as early as possible to make it a low priority and overrideable try:
ref: https://github.com/tornadoweb/tornado/issues/2608 orig_lop = asyncio.get_event_loop()
TODO: if/when tornado supports the defaults in asyncio, except RuntimeError:
remove and bump tornado requirement for py38 orig_lop = None
Copied from https://github.com/ipython/ipykernel/pull/456/ asyncio.set_event_loop(loop)
""" try:
try: yield
from asyncio import ( finally:
WindowsProactorEventLoopPolicy, asyncio.set_event_loop(orig_lop)
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
class TestUpdater: class TestUpdater:
@ -203,6 +197,106 @@ class TestUpdater:
assert not updater.httpd.is_running assert not updater.httpd.is_running
updater.stop() updater.stop()
def test_start_webhook_no_warning_or_error_logs(self, caplog, updater, monkeypatch):
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
# prevent api calls from @info decorator when updater.bot.id is used in thread names
monkeypatch.setattr(updater.bot, 'bot', User(id=123, first_name='bot', is_bot=True))
monkeypatch.setattr(updater.bot, '_commands', [])
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
with caplog.at_level(logging.WARNING):
updater.start_webhook(ip, port)
updater.stop()
assert not caplog.records
@pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8),
reason='Workaround only relevant on windows with py3.8+')
def test_start_webhook_ensure_event_loop(self, updater, monkeypatch):
def serve_forever(self, force_event_loop=False, ready=None):
with self.server_lock:
self.is_running = True
self._ensure_event_loop(force_event_loop=force_event_loop)
if ready is not None:
ready.set()
monkeypatch.setattr(WebhookServer, 'serve_forever', serve_forever)
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
with set_asyncio_event_loop(None):
updater._start_webhook(
ip,
port,
url_path='TOKEN',
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
webhook_url=None,
allowed_updates=None)
assert isinstance(asyncio.get_event_loop(), asyncio.SelectorEventLoop)
@pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8),
reason='Workaround only relevant on windows with py3.8+')
def test_start_webhook_force_event_loop_false(self, updater, monkeypatch):
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
with set_asyncio_event_loop(asyncio.ProactorEventLoop()):
with pytest.raises(TypeError, match='`ProactorEventLoop` is incompatible'):
updater._start_webhook(
ip,
port,
url_path='TOKEN',
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
webhook_url=None,
allowed_updates=None)
@pytest.mark.skipif(os.name != 'nt' or sys.version_info < (3, 8),
reason='Workaround only relevant on windows with py3.8+')
def test_start_webhook_force_event_loop_true(self, updater, monkeypatch):
def serve_forever(self, force_event_loop=False, ready=None):
with self.server_lock:
self.is_running = True
self._ensure_event_loop(force_event_loop=force_event_loop)
if ready is not None:
ready.set()
monkeypatch.setattr(WebhookServer, 'serve_forever', serve_forever)
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
with set_asyncio_event_loop(asyncio.ProactorEventLoop()):
updater._start_webhook(
ip,
port,
url_path='TOKEN',
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
webhook_url=None,
allowed_updates=None,
force_event_loop=True)
assert isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop)
def test_webhook_ssl(self, monkeypatch, updater): def test_webhook_ssl(self, monkeypatch, updater):
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True) monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True) monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
@ -267,33 +361,6 @@ class TestUpdater:
assert q.get(False).message.default_quote is True assert q.get(False).message.default_quote is True
updater.stop() updater.stop()
@pytest.mark.skipif(not (sys.platform.startswith("win") and sys.version_info >= (3, 8)),
reason="only relevant on win with py>=3.8")
def test_webhook_tornado_win_py38_workaround(self, updater, monkeypatch):
updater._default_quote = True
q = Queue()
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
updater.start_webhook(
ip,
port,
url_path='TOKEN')
sleep(.2)
try:
from asyncio import (WindowsSelectorEventLoopPolicy)
except ImportError:
pass
# not affected
else:
assert isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy)
updater.stop()
@pytest.mark.parametrize(('error',), @pytest.mark.parametrize(('error',),
argvalues=[(TelegramError(''),)], argvalues=[(TelegramError(''),)],
ids=('TelegramError',)) ids=('TelegramError',))