mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2025-01-18 15:20:42 +01:00
remove all other code to reduce testing
This commit is contained in:
parent
6b2c229a49
commit
4566a1debd
1 changed files with 3 additions and 388 deletions
|
@ -93,247 +93,7 @@ class TestUpdater:
|
|||
self.received = update.message.text
|
||||
self.cb_handler_called.set()
|
||||
|
||||
@pytest.mark.parametrize(('error',),
|
||||
argvalues=[(TelegramError('Test Error 2'),),
|
||||
(Unauthorized('Test Unauthorized'),)],
|
||||
ids=('TelegramError', 'Unauthorized'))
|
||||
def test_get_updates_normal_err(self, monkeypatch, updater, error):
|
||||
def test(*args, **kwargs):
|
||||
raise error
|
||||
|
||||
monkeypatch.setattr(updater.bot, 'get_updates', test)
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
updater.dispatcher.add_error_handler(self.error_handler)
|
||||
updater.start_polling(0.01)
|
||||
|
||||
# Make sure that the error handler was called
|
||||
self.err_handler_called.wait()
|
||||
assert self.received == error.message
|
||||
|
||||
# Make sure that Updater polling thread keeps running
|
||||
self.err_handler_called.clear()
|
||||
self.err_handler_called.wait()
|
||||
|
||||
def test_get_updates_bailout_err(self, monkeypatch, updater, caplog):
|
||||
error = InvalidToken()
|
||||
|
||||
def test(*args, **kwargs):
|
||||
raise error
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
monkeypatch.setattr(updater.bot, 'get_updates', test)
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
updater.dispatcher.add_error_handler(self.error_handler)
|
||||
updater.start_polling(0.01)
|
||||
assert self.err_handler_called.wait(1) is not True
|
||||
|
||||
sleep(1)
|
||||
# NOTE: This test might hit a race condition and fail (though the 1 seconds delay above
|
||||
# should work around it).
|
||||
# NOTE: Checking Updater.running is problematic because it is not set to False when there's
|
||||
# an unhandled exception.
|
||||
# TODO: We should have a way to poll Updater status and decide if it's running or not.
|
||||
import pprint
|
||||
pprint.pprint([rec.getMessage() for rec in caplog.get_records('call')])
|
||||
assert any('unhandled exception in Bot:{}:updater'.format(updater.bot.id) in
|
||||
rec.getMessage() for rec in caplog.get_records('call'))
|
||||
|
||||
@pytest.mark.parametrize(('error',),
|
||||
argvalues=[(RetryAfter(0.01),),
|
||||
(TimedOut(),)],
|
||||
ids=('RetryAfter', 'TimedOut'))
|
||||
def test_get_updates_retries(self, monkeypatch, updater, error):
|
||||
event = Event()
|
||||
|
||||
def test(*args, **kwargs):
|
||||
event.set()
|
||||
raise error
|
||||
|
||||
monkeypatch.setattr(updater.bot, 'get_updates', test)
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
updater.dispatcher.add_error_handler(self.error_handler)
|
||||
updater.start_polling(0.01)
|
||||
|
||||
# Make sure that get_updates was called, but not the error handler
|
||||
event.wait()
|
||||
assert self.err_handler_called.wait(0.5) is not True
|
||||
assert self.received != error.message
|
||||
|
||||
# Make sure that Updater polling thread keeps running
|
||||
event.clear()
|
||||
event.wait()
|
||||
assert self.err_handler_called.wait(0.5) is not True
|
||||
|
||||
def test_webhook(self, monkeypatch, updater):
|
||||
q = Queue()
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
|
||||
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # Select random port
|
||||
updater.start_webhook(
|
||||
ip,
|
||||
port,
|
||||
url_path='TOKEN')
|
||||
sleep(.2)
|
||||
try:
|
||||
# Now, we send an update to the server via urlopen
|
||||
update = Update(1, message=Message(1, User(1, '', False), None, Chat(1, ''),
|
||||
text='Webhook'))
|
||||
self._send_webhook_msg(ip, port, update.to_json(), 'TOKEN')
|
||||
sleep(.2)
|
||||
assert q.get(False) == update
|
||||
|
||||
# Returns 404 if path is incorrect
|
||||
with pytest.raises(HTTPError) as excinfo:
|
||||
self._send_webhook_msg(ip, port, None, 'webookhandler.py')
|
||||
assert excinfo.value.code == 404
|
||||
|
||||
with pytest.raises(HTTPError) as excinfo:
|
||||
self._send_webhook_msg(ip, port, None, 'webookhandler.py',
|
||||
get_method=lambda: 'HEAD')
|
||||
assert excinfo.value.code == 404
|
||||
|
||||
# Test multiple shutdown() calls
|
||||
updater.httpd.shutdown()
|
||||
finally:
|
||||
updater.httpd.shutdown()
|
||||
sleep(.2)
|
||||
assert not updater.httpd.is_running
|
||||
updater.stop()
|
||||
|
||||
def test_webhook_ssl(self, monkeypatch, updater):
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # Select random port
|
||||
tg_err = False
|
||||
try:
|
||||
updater._start_webhook(
|
||||
ip,
|
||||
port,
|
||||
url_path='TOKEN',
|
||||
cert='./tests/test_updater.py',
|
||||
key='./tests/test_updater.py',
|
||||
bootstrap_retries=0,
|
||||
clean=False,
|
||||
webhook_url=None,
|
||||
allowed_updates=None)
|
||||
except TelegramError:
|
||||
tg_err = True
|
||||
assert tg_err
|
||||
|
||||
def test_webhook_no_ssl(self, monkeypatch, updater):
|
||||
q = Queue()
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
|
||||
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # Select random port
|
||||
updater.start_webhook(ip, port, webhook_url=None)
|
||||
sleep(.2)
|
||||
|
||||
# Now, we send an update to the server via urlopen
|
||||
update = Update(1, message=Message(1, User(1, '', False), None, Chat(1, ''),
|
||||
text='Webhook 2'))
|
||||
self._send_webhook_msg(ip, port, update.to_json())
|
||||
sleep(.2)
|
||||
assert q.get(False) == update
|
||||
updater.stop()
|
||||
|
||||
def test_webhook_default_quote(self, monkeypatch, updater):
|
||||
updater._default_quote = True
|
||||
q = Queue()
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
|
||||
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # Select random port
|
||||
updater.start_webhook(
|
||||
ip,
|
||||
port,
|
||||
url_path='TOKEN')
|
||||
sleep(.2)
|
||||
|
||||
# Now, we send an update to the server via urlopen
|
||||
update = Update(1, message=Message(1, User(1, '', False), None, Chat(1, ''),
|
||||
text='Webhook'))
|
||||
self._send_webhook_msg(ip, port, update.to_json(), 'TOKEN')
|
||||
sleep(.2)
|
||||
# assert q.get(False) == update
|
||||
assert q.get(False).message.default_quote is True
|
||||
updater.stop()
|
||||
|
||||
@pytest.mark.skipif(not (sys.platform.startswith("win") and sys.version_info >= (3, 8)),
|
||||
reason="only relevant on win with py>=3.8")
|
||||
def test_webhook_tornado_win_py38_workaround(self, updater, monkeypatch):
|
||||
updater._default_quote = True
|
||||
q = Queue()
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
|
||||
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
|
||||
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # Select random port
|
||||
updater.start_webhook(
|
||||
ip,
|
||||
port,
|
||||
url_path='TOKEN')
|
||||
sleep(.2)
|
||||
|
||||
try:
|
||||
from asyncio import (WindowsSelectorEventLoopPolicy)
|
||||
except ImportError:
|
||||
pass
|
||||
# not affected
|
||||
else:
|
||||
assert isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy)
|
||||
|
||||
updater.stop()
|
||||
|
||||
@pytest.mark.parametrize(('error',),
|
||||
argvalues=[(TelegramError(''),)],
|
||||
ids=('TelegramError',))
|
||||
def test_bootstrap_retries_success(self, monkeypatch, updater, error):
|
||||
retries = 2
|
||||
|
||||
def attempt(*args, **kwargs):
|
||||
if self.attempts < retries:
|
||||
self.attempts += 1
|
||||
raise error
|
||||
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', attempt)
|
||||
|
||||
updater.running = True
|
||||
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
|
||||
assert self.attempts == retries
|
||||
|
||||
@pytest.mark.parametrize(('error', 'attempts'),
|
||||
argvalues=[(TelegramError(''), 2),
|
||||
(Unauthorized(''), 1),
|
||||
(InvalidToken(), 1)],
|
||||
ids=('TelegramError', 'Unauthorized', 'InvalidToken'))
|
||||
def test_bootstrap_retries_error(self, monkeypatch, updater, error, attempts):
|
||||
retries = 1
|
||||
|
||||
def attempt(*args, **kwargs):
|
||||
self.attempts += 1
|
||||
raise error
|
||||
|
||||
monkeypatch.setattr(updater.bot, 'set_webhook', attempt)
|
||||
|
||||
updater.running = True
|
||||
with pytest.raises(type(error)):
|
||||
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
|
||||
assert self.attempts == attempts
|
||||
|
||||
@pytest.mark.parametrize(('error', ),
|
||||
argvalues=[(TelegramError(''),)],
|
||||
ids=('TelegramError', ))
|
||||
def test_bootstrap_clean(self, monkeypatch, updater, error):
|
||||
def test_bootstrap_clean(self, monkeypatch, updater):
|
||||
clean = True
|
||||
expected_id = 4 # max 9 otherwise we hit our inf loop protection
|
||||
self.update_id = 0
|
||||
|
@ -350,7 +110,7 @@ class TestUpdater:
|
|||
# case 2
|
||||
if len(args) > 0:
|
||||
self.update_id = int(args[0])
|
||||
raise error
|
||||
raise ValueError('bla')
|
||||
|
||||
class FakeUpdate():
|
||||
def __init__(self, update_id):
|
||||
|
@ -376,151 +136,6 @@ class TestUpdater:
|
|||
monkeypatch.setattr(updater.bot, 'get_updates', get_updates)
|
||||
|
||||
updater.running = True
|
||||
with pytest.raises(type(error)):
|
||||
with pytest.raises(ValueError, match='bla'):
|
||||
updater._bootstrap(1, clean, None, None, bootstrap_interval=0)
|
||||
assert self.update_id == expected_id
|
||||
|
||||
@flaky(3, 1)
|
||||
def test_webhook_invalid_posts(self, updater):
|
||||
ip = '127.0.0.1'
|
||||
port = randrange(1024, 49152) # select random port for travis
|
||||
thr = Thread(
|
||||
target=updater._start_webhook,
|
||||
args=(ip, port, '', None, None, 0, False, None, None))
|
||||
thr.start()
|
||||
|
||||
sleep(.2)
|
||||
|
||||
try:
|
||||
with pytest.raises(HTTPError) as excinfo:
|
||||
self._send_webhook_msg(ip, port, '<root><bla>data</bla></root>',
|
||||
content_type='application/xml')
|
||||
assert excinfo.value.code == 403
|
||||
|
||||
with pytest.raises(HTTPError) as excinfo:
|
||||
self._send_webhook_msg(ip, port, 'dummy-payload', content_len=-2)
|
||||
assert excinfo.value.code == 500
|
||||
|
||||
# TODO: prevent urllib or the underlying from adding content-length
|
||||
# with pytest.raises(HTTPError) as excinfo:
|
||||
# self._send_webhook_msg(ip, port, 'dummy-payload', content_len=None)
|
||||
# assert excinfo.value.code == 411
|
||||
|
||||
with pytest.raises(HTTPError):
|
||||
self._send_webhook_msg(ip, port, 'dummy-payload', content_len='not-a-number')
|
||||
assert excinfo.value.code == 500
|
||||
|
||||
finally:
|
||||
updater.httpd.shutdown()
|
||||
thr.join()
|
||||
|
||||
def _send_webhook_msg(self,
|
||||
ip,
|
||||
port,
|
||||
payload_str,
|
||||
url_path='',
|
||||
content_len=-1,
|
||||
content_type='application/json',
|
||||
get_method=None):
|
||||
headers = {'content-type': content_type, }
|
||||
|
||||
if not payload_str:
|
||||
content_len = None
|
||||
payload = None
|
||||
else:
|
||||
payload = bytes(payload_str, encoding='utf-8')
|
||||
|
||||
if content_len == -1:
|
||||
content_len = len(payload)
|
||||
|
||||
if content_len is not None:
|
||||
headers['content-length'] = str(content_len)
|
||||
|
||||
url = 'http://{ip}:{port}/{path}'.format(ip=ip, port=port, path=url_path)
|
||||
|
||||
req = Request(url, data=payload, headers=headers)
|
||||
|
||||
if get_method is not None:
|
||||
req.get_method = get_method
|
||||
|
||||
return urlopen(req)
|
||||
|
||||
def signal_sender(self, updater):
|
||||
sleep(0.2)
|
||||
while not updater.running:
|
||||
sleep(0.2)
|
||||
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
|
||||
@signalskip
|
||||
def test_idle(self, updater, caplog):
|
||||
updater.start_polling(0.01)
|
||||
Thread(target=partial(self.signal_sender, updater=updater)).start()
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
updater.idle()
|
||||
|
||||
rec = caplog.records[-1]
|
||||
assert rec.msg.startswith('Received signal {}'.format(signal.SIGTERM))
|
||||
assert rec.levelname == 'INFO'
|
||||
|
||||
# If we get this far, idle() ran through
|
||||
sleep(.5)
|
||||
assert updater.running is False
|
||||
|
||||
@signalskip
|
||||
def test_user_signal(self, updater):
|
||||
temp_var = {'a': 0}
|
||||
|
||||
def user_signal_inc(signum, frame):
|
||||
temp_var['a'] = 1
|
||||
|
||||
updater.user_sig_handler = user_signal_inc
|
||||
updater.start_polling(0.01)
|
||||
Thread(target=partial(self.signal_sender, updater=updater)).start()
|
||||
updater.idle()
|
||||
# If we get this far, idle() ran through
|
||||
sleep(.5)
|
||||
assert updater.running is False
|
||||
assert temp_var['a'] != 0
|
||||
|
||||
def test_create_bot(self):
|
||||
updater = Updater('123:abcd')
|
||||
assert updater.bot is not None
|
||||
|
||||
def test_mutual_exclude_token_bot(self):
|
||||
bot = Bot('123:zyxw')
|
||||
with pytest.raises(ValueError):
|
||||
Updater(token='123:abcd', bot=bot)
|
||||
|
||||
def test_no_token_or_bot_or_dispatcher(self):
|
||||
with pytest.raises(ValueError):
|
||||
Updater()
|
||||
|
||||
def test_mutual_exclude_bot_private_key(self):
|
||||
bot = Bot('123:zyxw')
|
||||
with pytest.raises(ValueError):
|
||||
Updater(bot=bot, private_key=b'key')
|
||||
|
||||
def test_mutual_exclude_bot_dispatcher(self):
|
||||
dispatcher = Dispatcher(None, None)
|
||||
bot = Bot('123:zyxw')
|
||||
with pytest.raises(ValueError):
|
||||
Updater(bot=bot, dispatcher=dispatcher)
|
||||
|
||||
def test_mutual_exclude_persistence_dispatcher(self):
|
||||
dispatcher = Dispatcher(None, None)
|
||||
persistence = DictPersistence()
|
||||
with pytest.raises(ValueError):
|
||||
Updater(dispatcher=dispatcher, persistence=persistence)
|
||||
|
||||
def test_mutual_exclude_workers_dispatcher(self):
|
||||
dispatcher = Dispatcher(None, None)
|
||||
with pytest.raises(ValueError):
|
||||
Updater(dispatcher=dispatcher, workers=8)
|
||||
|
||||
def test_mutual_exclude_use_context_dispatcher(self):
|
||||
dispatcher = Dispatcher(None, None)
|
||||
use_context = not dispatcher.use_context
|
||||
with pytest.raises(ValueError):
|
||||
Updater(dispatcher=dispatcher, use_context=use_context)
|
||||
|
|
Loading…
Reference in a new issue