From 3076dfc0860062f5216ab5a03feb38b21a533b94 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Mon, 30 May 2016 00:26:18 +0300 Subject: [PATCH 01/28] use urllib3 instead of urllib(2) --- requirements.txt | 1 + telegram/utils/request.py | 117 +++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 58 deletions(-) diff --git a/requirements.txt b/requirements.txt index 2c6edea8d..9e7a55731 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ future +urllib3 diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 52fe434d7..8600f5c31 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -18,29 +18,31 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains methods to make POST and GET requests""" -import functools import json -import socket -from ssl import SSLError -from future.moves.http.client import HTTPException -from future.moves.urllib.error import HTTPError, URLError -from future.moves.urllib.request import urlopen, urlretrieve, Request +import urllib3 from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest +_CON_POOL = None + + +def _get_con_pool(): + if _CON_POOL is not None: + return _CON_POOL + + global _CON_POOL + _CON_POOL = urllib3.PoolManager(10) + return _CON_POOL + def _parse(json_data): - """Try and parse the JSON returned from Telegram and return an empty - dictionary if there is any error. - - Args: - url: - urllib.urlopen object + """Try and parse the JSON returned from Telegram. Returns: - A JSON parsed as Python dict with results. + dict: A JSON parsed as Python dict with results - on error this dict will be empty. + """ decoded_s = json_data.decode('utf-8') try: @@ -54,53 +56,49 @@ def _parse(json_data): return data['result'] -def _try_except_req(func): - """Decorator for requests to handle known exceptions""" +def _request_wrapper(*args, **kwargs): + """Wraps urllib3 request for handling known exceptions. - @functools.wraps(func) - def decorator(*args, **kwargs): - try: - return func(*args, **kwargs) + Args: + args: unnamed arguments, passed to urllib3 request. + kwargs: keyword arguments, passed tp urllib3 request. - except HTTPError as error: - # `HTTPError` inherits from `URLError` so `HTTPError` handling must - # come first. - errcode = error.getcode() + Returns: + str: A non-parsed JSON text. - try: - message = _parse(error.read()) + Raises: + TelegramError - if errcode in (401, 403): - raise Unauthorized() - elif errcode == 400: - raise BadRequest(message) - elif errcode == 502: - raise NetworkError('Bad Gateway') - except ValueError: - message = 'Unknown HTTPError {0}'.format(error.getcode()) + """ - raise NetworkError('{0} ({1})'.format(message, errcode)) + try: + resp = _get_con_pool().request(*args, **kwargs) + except urllib3.exceptions.TimeoutError as error: + raise TimedOut() + except urllib3.exceptions.HTTPError as error: + # HTTPError must come last as its the base urllib3 exception class + # TODO: do something smart here; for now just raise NetowrkError + raise NetworkError('urllib3 HTTPError {0}'.format(error)) - except URLError as error: - raise NetworkError('URLError: {0}'.format(error.reason)) + if 200 <= resp.status <= 299: + # 200-299 range are HTTP success statuses + return resp.data - except (SSLError, socket.timeout) as error: - err_s = str(error) - if 'operation timed out' in err_s: - raise TimedOut() + try: + message = _parse(resp.data) + except ValueError: + raise NetworkError('Unknown HTTPError {0}'.format(resp.status)) - raise NetworkError(err_s) - - except HTTPException as error: - raise NetworkError('HTTPException: {0!r}'.format(error)) - - except socket.error as error: - raise NetworkError('socket.error: {0!r}'.format(error)) - - return decorator + if resp.status in (401, 403): + raise Unauthorized() + elif resp.status == 400: + raise BadRequest(repr(message)) + elif resp.status == 502: + raise NetworkError('Bad Gateway') + else: + raise NetworkError('{0} ({1})'.format(message, resp.status)) -@_try_except_req def get(url): """Request an URL. Args: @@ -109,13 +107,13 @@ def get(url): Returns: A JSON object. + """ - result = urlopen(url).read() + result = _request_wrapper('GET', url) return _parse(result) -@_try_except_req def post(url, data, timeout=None): """Request an URL. Args: @@ -142,16 +140,17 @@ def post(url, data, timeout=None): if InputFile.is_inputfile(data): data = InputFile(data) - request = Request(url, data=data.to_form(), headers=data.headers) + result = _request_wrapper('POST', url, body=data.to_form(), headers=data.headers) else: data = json.dumps(data) - request = Request(url, data=data.encode(), headers={'Content-Type': 'application/json'}) + result = _request_wrapper('POST', + url, + body=data.encode(), + headers={'Content-Type': 'application/json'}) - result = urlopen(request, **urlopen_kwargs).read() return _parse(result) -@_try_except_req def download(url, filename): """Download a file by its URL. Args: @@ -160,6 +159,8 @@ def download(url, filename): filename: The filename within the path to download the file. - """ - urlretrieve(url, filename) + """ + buf = _request_wrapper('GET', url) + with open(filename, 'wb') as fobj: + fobj.write(buf) From b040568b072b5d64fefb308faa71f6d103ad2b74 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Mon, 30 May 2016 00:46:42 +0300 Subject: [PATCH 02/28] test_bot: fix for urllib3 compatibility --- tests/test_bot.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_bot.py b/tests/test_bot.py index db26e330f..5b59b990f 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -20,6 +20,7 @@ """This module contains a object that represents Tests for Telegram Bot""" import io +import re from datetime import datetime import sys @@ -211,10 +212,11 @@ class BotTest(BaseTest, unittest.TestCase): @flaky(3, 1) @timeout(10) def testLeaveChat(self): - with self.assertRaisesRegexp(telegram.error.BadRequest, 'Chat not found'): + regex = re.compile('chat not found', re.IGNORECASE) + with self.assertRaisesRegexp(telegram.error.BadRequest, regex): chat = self._bot.leaveChat(-123456) - with self.assertRaisesRegexp(telegram.error.NetworkError, 'Chat not found'): + with self.assertRaisesRegexp(telegram.error.NetworkError, regex): chat = self._bot.leaveChat(-123456) @flaky(3, 1) From 574fc8cddfd1183df8d7260a0dabc233f3b45429 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Mon, 30 May 2016 00:49:29 +0300 Subject: [PATCH 03/28] urllib3: validate https certificate --- requirements.txt | 1 + telegram/utils/request.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9e7a55731..678e22607 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ future urllib3 +certifi diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 8600f5c31..5926add9f 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -20,6 +20,7 @@ import json +import certifi import urllib3 from telegram import (InputFile, TelegramError) @@ -33,7 +34,7 @@ def _get_con_pool(): return _CON_POOL global _CON_POOL - _CON_POOL = urllib3.PoolManager(10) + _CON_POOL = urllib3.PoolManager(10, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where()) return _CON_POOL From 57759d8e6d3f003ffbebce3467617947aa33c851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Mon, 30 May 2016 03:16:33 +0200 Subject: [PATCH 04/28] [drunk] use actual thread pool and queue new functions into the pool instead of starting new threads every time --- telegram/ext/dispatcher.py | 55 +++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index c715e6007..f6d068bdb 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -20,10 +20,9 @@ import logging from functools import wraps -from threading import Thread, BoundedSemaphore, Lock, Event, current_thread +from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread from time import sleep - -from queue import Empty +from queue import Queue, Empty from telegram import (TelegramError, NullHandler) from telegram.ext.handler import Handler @@ -32,12 +31,28 @@ from telegram.utils.deprecate import deprecate logging.getLogger(__name__).addHandler(NullHandler()) semaphore = None -async_threads = set() -""":type: set[Thread]""" +async_threads = list() +""":type: list[tuple(Thread, Event, Queue)]""" async_lock = Lock() DEFAULT_GROUP = 0 +def pooled(event, queue): + """ + A wrapper to run a thread in a thread pool + """ + while True: + logging.info('Waiting for function') + func, args, kwargs = queue.get() + logging.info('Got function') + try: + func(*args, **kwargs) + logging.info('Executed function') + finally: + event.clear() + semaphore.release() + + def run_async(func): """ Function decorator that will run the function in a new thread. @@ -52,31 +67,17 @@ def run_async(func): # TODO: handle exception in async threads # set a threading.Event to notify caller thread - @wraps(func) - def pooled(*pargs, **kwargs): - """ - A wrapper to run a thread in a thread pool - """ - try: - result = func(*pargs, **kwargs) - finally: - semaphore.release() - - with async_lock: - async_threads.remove(current_thread()) - return result - @wraps(func) def async_func(*pargs, **kwargs): """ A wrapper to run a function in a thread """ - thread = Thread(target=pooled, args=pargs, kwargs=kwargs) semaphore.acquire() - with async_lock: - async_threads.add(thread) - thread.start() - return thread + for thread, event, queue in async_threads: + if not event.is_set(): + event.set() + queue.put((func, pargs, kwargs)) + break return async_func @@ -110,6 +111,12 @@ class Dispatcher(object): global semaphore if not semaphore: semaphore = BoundedSemaphore(value=workers) + for i in range(workers): + event = Event() + queue = Queue() + thread = Thread(target=pooled, args=(event, queue)) + async_threads.append((thread, event, queue)) + thread.start() else: self.logger.debug('Semaphore already initialized, skipping.') From dd91ce1f399c820ec4583f63b9067257680d8194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Mon, 30 May 2016 13:09:23 +0200 Subject: [PATCH 05/28] use single queue for thread pool, initialize connection pool with n+3 --- telegram/ext/dispatcher.py | 46 +++++++++++++++++++------------------- telegram/ext/updater.py | 4 ++++ telegram/utils/request.py | 5 ++++- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index f6d068bdb..72f206236 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -20,37 +20,45 @@ import logging from functools import wraps -from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread +from threading import Thread, Lock, Event # , current_thread from time import sleep from queue import Queue, Empty from telegram import (TelegramError, NullHandler) +from telegram.utils import request from telegram.ext.handler import Handler from telegram.utils.deprecate import deprecate logging.getLogger(__name__).addHandler(NullHandler()) -semaphore = None -async_threads = list() -""":type: list[tuple(Thread, Event, Queue)]""" +async_queue = Queue() +async_threads = set() +""":type: set[Thread]""" async_lock = Lock() DEFAULT_GROUP = 0 -def pooled(event, queue): +def pooled(): """ A wrapper to run a thread in a thread pool """ while True: logging.info('Waiting for function') - func, args, kwargs = queue.get() + + try: + func, args, kwargs = async_queue.get() + + except TypeError: + break + logging.info('Got function') + try: func(*args, **kwargs) logging.info('Executed function') - finally: - event.clear() - semaphore.release() + + except: + logging.exception("Async function raised exception") def run_async(func): @@ -72,12 +80,7 @@ def run_async(func): """ A wrapper to run a function in a thread """ - semaphore.acquire() - for thread, event, queue in async_threads: - if not event.is_set(): - event.set() - queue.put((func, pargs, kwargs)) - break + async_queue.put((func, pargs, kwargs)) return async_func @@ -108,17 +111,14 @@ class Dispatcher(object): self.__stop_event = Event() self.__exception_event = exception_event or Event() - global semaphore - if not semaphore: - semaphore = BoundedSemaphore(value=workers) + if not len(async_threads): + request.CON_POOL_SIZE = workers + 3 for i in range(workers): - event = Event() - queue = Queue() - thread = Thread(target=pooled, args=(event, queue)) - async_threads.append((thread, event, queue)) + thread = Thread(target=pooled) + async_threads.add(thread) thread.start() else: - self.logger.debug('Semaphore already initialized, skipping.') + self.logger.debug('Thread pool already initialized, skipping.') def start(self): """ diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index df70275e5..2f92fe43f 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -375,6 +375,10 @@ class Updater(object): with dispatcher.async_lock: threads = list(dispatcher.async_threads) total = len(threads) + + for i in range(total): + dispatcher.async_queue.put(0) + for i, thr in enumerate(threads): self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total)) thr.join() diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 5926add9f..5145685b6 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -27,6 +27,7 @@ from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest _CON_POOL = None +CON_POOL_SIZE = 1 def _get_con_pool(): @@ -34,7 +35,9 @@ def _get_con_pool(): return _CON_POOL global _CON_POOL - _CON_POOL = urllib3.PoolManager(10, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where()) + _CON_POOL = urllib3.PoolManager(CON_POOL_SIZE, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where()) return _CON_POOL From 41f6591ac69b5e79306ffc9552ac06267e6e1044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Mon, 30 May 2016 17:12:27 +0200 Subject: [PATCH 06/28] more sensible logging --- telegram/ext/dispatcher.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index 72f206236..fd6fc58c3 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -20,7 +20,7 @@ import logging from functools import wraps -from threading import Thread, Lock, Event # , current_thread +from threading import Thread, Lock, Event, current_thread from time import sleep from queue import Queue, Empty @@ -43,19 +43,16 @@ def pooled(): A wrapper to run a thread in a thread pool """ while True: - logging.info('Waiting for function') - try: func, args, kwargs = async_queue.get() except TypeError: + logging.debug("Closing run_async thread %s/%d" % + (current_thread().getName(), len(async_threads))) break - logging.info('Got function') - try: func(*args, **kwargs) - logging.info('Executed function') except: logging.exception("Async function raised exception") @@ -114,7 +111,7 @@ class Dispatcher(object): if not len(async_threads): request.CON_POOL_SIZE = workers + 3 for i in range(workers): - thread = Thread(target=pooled) + thread = Thread(target=pooled, name=str(i)) async_threads.add(thread) thread.start() else: From 74283bd41443bfc65e17ab0218dcf1544b32c25c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Mon, 30 May 2016 17:12:50 +0200 Subject: [PATCH 07/28] use HTTPSConnectionPool instead of PoolManager --- telegram/utils/request.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 5145685b6..f4c2901e4 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -35,9 +35,10 @@ def _get_con_pool(): return _CON_POOL global _CON_POOL - _CON_POOL = urllib3.PoolManager(CON_POOL_SIZE, - cert_reqs='CERT_REQUIRED', - ca_certs=certifi.where()) + _CON_POOL = urllib3.HTTPSConnectionPool(host='api.telegram.org', + maxsize=CON_POOL_SIZE, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where()) return _CON_POOL From 6b457bada5bb810012309b8380aef0e48c9c7e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Tue, 31 May 2016 13:45:43 +0200 Subject: [PATCH 08/28] use keepalive for connection pool --- telegram/utils/request.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index f4c2901e4..4b923df4b 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -19,9 +19,11 @@ """This module contains methods to make POST and GET requests""" import json +import socket import certifi import urllib3 +from urllib3.connection import HTTPConnection from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest @@ -35,10 +37,14 @@ def _get_con_pool(): return _CON_POOL global _CON_POOL - _CON_POOL = urllib3.HTTPSConnectionPool(host='api.telegram.org', - maxsize=CON_POOL_SIZE, - cert_reqs='CERT_REQUIRED', - ca_certs=certifi.where()) + _CON_POOL = urllib3.HTTPSConnectionPool( + host='api.telegram.org', + maxsize=CON_POOL_SIZE, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where(), + socket_options=HTTPConnection.default_socket_options + [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + ]) return _CON_POOL From 1ff348adbbf924fe16a712c7b6a3499d1b329e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Tue, 31 May 2016 13:47:43 +0200 Subject: [PATCH 09/28] issue warning if connection pool was initialized before Dispatcher --- telegram/ext/dispatcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index fd6fc58c3..bd4151721 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -109,6 +109,8 @@ class Dispatcher(object): self.__exception_event = exception_event or Event() if not len(async_threads): + if request._CON_POOL: + self.logger.warning("Connection Pool already initialized!") request.CON_POOL_SIZE = workers + 3 for i in range(workers): thread = Thread(target=pooled, name=str(i)) From 78f9bdcac906d0cfdd633321b647095af4eab373 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 1 Jun 2016 21:05:34 +0300 Subject: [PATCH 10/28] dispatcher: pep8 style fix globals are supposed to be upper case --- telegram/ext/dispatcher.py | 20 ++++++++++---------- telegram/ext/updater.py | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index bd4151721..e71fedf45 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -31,24 +31,24 @@ from telegram.utils.deprecate import deprecate logging.getLogger(__name__).addHandler(NullHandler()) -async_queue = Queue() -async_threads = set() +ASYNC_QUEUE = Queue() +ASYNC_THREADS = set() """:type: set[Thread]""" -async_lock = Lock() +ASYNC_LOCK = Lock() DEFAULT_GROUP = 0 -def pooled(): +def _pooled(): """ A wrapper to run a thread in a thread pool """ while True: try: - func, args, kwargs = async_queue.get() + func, args, kwargs = ASYNC_QUEUE.get() except TypeError: logging.debug("Closing run_async thread %s/%d" % - (current_thread().getName(), len(async_threads))) + (current_thread().getName(), len(ASYNC_THREADS))) break try: @@ -77,7 +77,7 @@ def run_async(func): """ A wrapper to run a function in a thread """ - async_queue.put((func, pargs, kwargs)) + ASYNC_QUEUE.put((func, pargs, kwargs)) return async_func @@ -108,13 +108,13 @@ class Dispatcher(object): self.__stop_event = Event() self.__exception_event = exception_event or Event() - if not len(async_threads): + if not len(ASYNC_THREADS): if request._CON_POOL: self.logger.warning("Connection Pool already initialized!") request.CON_POOL_SIZE = workers + 3 for i in range(workers): - thread = Thread(target=pooled, name=str(i)) - async_threads.add(thread) + thread = Thread(target=_pooled, name=str(i)) + ASYNC_THREADS.add(thread) thread.start() else: self.logger.debug('Thread pool already initialized, skipping.') diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 2f92fe43f..cba48aaf6 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -372,12 +372,12 @@ class Updater(object): self.dispatcher.stop() def _join_async_threads(self): - with dispatcher.async_lock: - threads = list(dispatcher.async_threads) + with dispatcher.ASYNC_LOCK: + threads = list(dispatcher.ASYNC_THREADS) total = len(threads) for i in range(total): - dispatcher.async_queue.put(0) + dispatcher.ASYNC_QUEUE.put(0) for i, thr in enumerate(threads): self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total)) From dd8b6219b9829e1d34e39a92880e725ee40fccc7 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 1 Jun 2016 21:07:48 +0300 Subject: [PATCH 11/28] dispatcher: a little performance improvment --- telegram/ext/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index e71fedf45..e10ccee6f 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -42,7 +42,7 @@ def _pooled(): """ A wrapper to run a thread in a thread pool """ - while True: + while 1: try: func, args, kwargs = ASYNC_QUEUE.get() From c28763c5bee9ce7a5417528af5bed10c2879f862 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 1 Jun 2016 21:11:00 +0300 Subject: [PATCH 12/28] dispatcher: cosmetic fix --- telegram/ext/dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index e10ccee6f..85180a2db 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -73,11 +73,11 @@ def run_async(func): # set a threading.Event to notify caller thread @wraps(func) - def async_func(*pargs, **kwargs): + def async_func(*args, **kwargs): """ A wrapper to run a function in a thread """ - ASYNC_QUEUE.put((func, pargs, kwargs)) + ASYNC_QUEUE.put((func, args, kwargs)) return async_func From 3608c2bbe5151fe055f89d035ea2b67b16df8eac Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 1 Jun 2016 21:21:24 +0300 Subject: [PATCH 13/28] dispatcher: if connection pool is already initialized raise exception this will better protect the user from wrong usage --- telegram/ext/dispatcher.py | 7 +++++-- telegram/utils/request.py | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index 85180a2db..31404501a 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -24,6 +24,8 @@ from threading import Thread, Lock, Event, current_thread from time import sleep from queue import Queue, Empty +from future.builtins import range + from telegram import (TelegramError, NullHandler) from telegram.utils import request from telegram.ext.handler import Handler @@ -109,8 +111,9 @@ class Dispatcher(object): self.__exception_event = exception_event or Event() if not len(ASYNC_THREADS): - if request._CON_POOL: - self.logger.warning("Connection Pool already initialized!") + if request.is_con_pool_initialized(): + raise RuntimeError('Connection Pool already initialized') + request.CON_POOL_SIZE = workers + 3 for i in range(workers): thread = Thread(target=_pooled, name=str(i)) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 4b923df4b..f43482747 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -47,6 +47,9 @@ def _get_con_pool(): ]) return _CON_POOL +def is_con_pool_initialized(): + return _CON_POOL is not None + def _parse(json_data): """Try and parse the JSON returned from Telegram. From 1f5601dae241628b1ec740b09000d2b6b8e1a982 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 1 Jun 2016 22:38:08 +0300 Subject: [PATCH 14/28] fix SyntaxWarning --- telegram/utils/request.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index f43482747..28cca9e41 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -33,10 +33,11 @@ CON_POOL_SIZE = 1 def _get_con_pool(): + global _CON_POOL + if _CON_POOL is not None: return _CON_POOL - global _CON_POOL _CON_POOL = urllib3.HTTPSConnectionPool( host='api.telegram.org', maxsize=CON_POOL_SIZE, From bda0244ed85657f43a27ed8237a66b68bc4ee7d1 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 17 Jun 2016 16:52:25 +0300 Subject: [PATCH 15/28] updater: fix print in log --- telegram/ext/updater.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index cba48aaf6..b958d6550 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -380,9 +380,9 @@ class Updater(object): dispatcher.ASYNC_QUEUE.put(0) for i, thr in enumerate(threads): - self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total)) + self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total)) thr.join() - self.logger.debug('async thread {0}/{1} has ended'.format(i, total)) + self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total)) def _join_threads(self): for thr in self.__threads: From 881d1d0e25d449b1cbe097dd58004688ceceff03 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 17 Jun 2016 23:53:18 +0300 Subject: [PATCH 16/28] fix/hack Updater.stop() not working on extreme cases during test_bootstrap_retries_fail() there is an exception raised (by design): TelegramError('test') For a reason I haven't managed to pinpoint the above exception in its precise timing caused the Updater to be left in a state which is 'self.running == False', but the dispatcher threads already initialized. This patch identifies this extreme case and makes sure to go over the stop procedure. --- telegram/ext/updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index b958d6550..7fb19c569 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -346,7 +346,7 @@ class Updater(object): self.job_queue.stop() with self.__lock: - if self.running: + if self.running or dispatcher.ASYNC_THREADS: self.logger.debug('Stopping Updater and Dispatcher...') self.running = False From a30411c9fae8270008a2be03025cf4745d9ad9e1 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 17 Jun 2016 23:58:22 +0300 Subject: [PATCH 17/28] make sure to remove the stopped dispatcher threads from ASYNC_THREADS --- telegram/ext/updater.py | 1 + 1 file changed, 1 insertion(+) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 7fb19c569..420763e87 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -382,6 +382,7 @@ class Updater(object): for i, thr in enumerate(threads): self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total)) thr.join() + dispatcher.ASYNC_THREADS.remove(thr) self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total)) def _join_threads(self): From e479c7f25e009aa685a374e06b08af6170f26579 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 17 Jun 2016 23:59:32 +0300 Subject: [PATCH 18/28] type hinting (cosmetic fix) --- telegram/utils/request.py | 1 + 1 file changed, 1 insertion(+) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 28cca9e41..ec7b8cdc5 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -29,6 +29,7 @@ from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest _CON_POOL = None +""":type: urllib3.HTTPSConnectionPool""" CON_POOL_SIZE = 1 From d37b6d6735bda7f01c25906568f90872dc6e3221 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 00:01:36 +0300 Subject: [PATCH 19/28] make sure to stop Updater after the test_createBot is over --- tests/test_updater.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_updater.py b/tests/test_updater.py index 192416ba1..8f7933d57 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -639,8 +639,8 @@ class UpdaterTest(BaseTest, unittest.TestCase): self.assertFalse(self.updater.running) def test_createBot(self): - updater = Updater('123:abcd') - self.assertIsNotNone(updater.bot) + self.updater = Updater('123:abcd') + self.assertIsNotNone(self.updater.bot) def test_mutualExclusiveTokenBot(self): bot = Bot('123:zyxw') From a814e9de6bdc53268a449edd651c3c316b7283bb Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 00:50:44 +0300 Subject: [PATCH 20/28] make sure to stop conpool between sensitive unitests --- telegram/utils/request.py | 9 +++++++++ tests/test_jobqueue.py | 3 +++ tests/test_updater.py | 2 ++ 3 files changed, 14 insertions(+) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index ec7b8cdc5..b6f3a780e 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -49,10 +49,19 @@ def _get_con_pool(): ]) return _CON_POOL + def is_con_pool_initialized(): return _CON_POOL is not None +def stop_con_pool(): + global _CON_POOL + if _CON_POOL is not None: + _CON_POOL.close() + _CON_POOL = None + + + def _parse(json_data): """Try and parse the JSON returned from Telegram. diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 4ac90035d..2d3678fcd 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -1,3 +1,4 @@ + #!/usr/bin/env python # encoding: utf-8 # @@ -31,6 +32,7 @@ else: sys.path.append('.') +from telegram.utils.request import stop_con_pool from telegram.ext import JobQueue, Updater from tests.base import BaseTest @@ -58,6 +60,7 @@ class JobQueueTest(BaseTest, unittest.TestCase): def tearDown(self): if self.jq is not None: self.jq.stop() + stop_con_pool() def job1(self, bot): self.result += 1 diff --git a/tests/test_updater.py b/tests/test_updater.py index 8f7933d57..88be926be 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -48,6 +48,7 @@ except ImportError: sys.path.append('.') from telegram import Update, Message, TelegramError, User, Chat, Bot +from telegram.utils.request import stop_con_pool from telegram.ext import * from telegram.ext.dispatcher import run_async from telegram.error import Unauthorized, InvalidToken @@ -84,6 +85,7 @@ class UpdaterTest(BaseTest, unittest.TestCase): def tearDown(self): if self.updater is not None: self.updater.stop() + stop_con_pool() def reset(self): self.message_count = 0 From bc77c845eadd7dcc9990579356381eaba370d444 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 09:53:08 +0300 Subject: [PATCH 21/28] test_updater: make sure that conpool is stopped before setting updater even for the first unitest, it might come after another unitests from another file which had already init the conpool. --- tests/test_updater.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_updater.py b/tests/test_updater.py index 88be926be..95df1051f 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -79,6 +79,7 @@ class UpdaterTest(BaseTest, unittest.TestCase): self.lock = Lock() def _setup_updater(self, *args, **kwargs): + stop_con_pool() bot = MockBot(*args, **kwargs) self.updater = Updater(workers=2, bot=bot) From fc05d3a62643dd893b0a60da1a7bd7a6f1afee23 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 19:48:19 +0300 Subject: [PATCH 22/28] switch back to PoolManager telegram servers might send a reponse with HTTP 302 (redirect) to another hostname. in such case HTTPSConnectionPool will fail to do the job --- telegram/utils/request.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index b6f3a780e..ba2d62780 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -29,7 +29,7 @@ from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest _CON_POOL = None -""":type: urllib3.HTTPSConnectionPool""" +""":type: urllib3.PoolManager""" CON_POOL_SIZE = 1 @@ -39,8 +39,7 @@ def _get_con_pool(): if _CON_POOL is not None: return _CON_POOL - _CON_POOL = urllib3.HTTPSConnectionPool( - host='api.telegram.org', + _CON_POOL = urllib3.PoolManager( maxsize=CON_POOL_SIZE, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(), @@ -57,7 +56,7 @@ def is_con_pool_initialized(): def stop_con_pool(): global _CON_POOL if _CON_POOL is not None: - _CON_POOL.close() + _CON_POOL.clear() _CON_POOL = None From 494a7ec1e46cbe8da39b44db755a30962ca5d609 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 19:57:11 +0300 Subject: [PATCH 23/28] ypaf fixes --- telegram/utils/request.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index ba2d62780..f54264ce9 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -39,13 +39,12 @@ def _get_con_pool(): if _CON_POOL is not None: return _CON_POOL - _CON_POOL = urllib3.PoolManager( - maxsize=CON_POOL_SIZE, - cert_reqs='CERT_REQUIRED', - ca_certs=certifi.where(), - socket_options=HTTPConnection.default_socket_options + [ - (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), - ]) + _CON_POOL = urllib3.PoolManager(maxsize=CON_POOL_SIZE, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where(), + socket_options=HTTPConnection.default_socket_options + [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + ]) return _CON_POOL @@ -60,7 +59,6 @@ def stop_con_pool(): _CON_POOL = None - def _parse(json_data): """Try and parse the JSON returned from Telegram. From 5b91194cc7d504376c2c600a5ca7a5aed25d68dc Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Sat, 18 Jun 2016 20:05:10 +0300 Subject: [PATCH 24/28] new yapf version, new cosmetic fixes --- telegram/bot.py | 24 +++++++++++++----------- telegram/ext/updater.py | 7 +++---- telegram/inputfile.py | 6 +++--- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/telegram/bot.py b/telegram/bot.py index 2145bee79..16338bd69 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -618,13 +618,14 @@ class Bot(TelegramObject): @log @message - def sendVenue( - self, chat_id, - latitude, - longitude, - title, address, - foursquare_id=None, - **kwargs): + def sendVenue(self, + chat_id, + latitude, + longitude, + title, + address, + foursquare_id=None, + **kwargs): """ Use this method to send information about a venue. @@ -1132,10 +1133,11 @@ class Bot(TelegramObject): @log @message - def editMessageReplyMarkup( - self, chat_id=None, - message_id=None, inline_message_id=None, - **kwargs): + def editMessageReplyMarkup(self, + chat_id=None, + message_id=None, + inline_message_id=None, + **kwargs): """Use this method to edit only the reply markup of messages sent by the bot or via the bot (for inline bots). diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 420763e87..3fc99e43b 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -285,10 +285,9 @@ class Updater(object): def _check_ssl_cert(self, cert, key): # Check SSL-Certificate with openssl, if possible try: - exit_code = subprocess.call( - ["openssl", "x509", "-text", "-noout", "-in", cert], - stdout=open(os.devnull, 'wb'), - stderr=subprocess.STDOUT) + exit_code = subprocess.call(["openssl", "x509", "-text", "-noout", "-in", cert], + stdout=open(os.devnull, 'wb'), + stderr=subprocess.STDOUT) except OSError: exit_code = 0 if exit_code is 0: diff --git a/telegram/inputfile.py b/telegram/inputfile.py index 8c9276eb5..5ed3c28c9 100644 --- a/telegram/inputfile.py +++ b/telegram/inputfile.py @@ -125,9 +125,9 @@ class InputFile(object): # Add input_file to upload form.extend([ - form_boundary, 'Content-Disposition: form-data; name="%s"; filename="%s"' % ( - self.input_name, self.filename - ), 'Content-Type: %s' % self.mimetype, '', self.input_file_content + form_boundary, 'Content-Disposition: form-data; name="%s"; filename="%s"' % + (self.input_name, self.filename), 'Content-Type: %s' % self.mimetype, '', + self.input_file_content ]) form.append('--' + self.boundary + '--') From 949f4a4fbd43a4ef75bc0763a72cc828bb93a801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Sun, 19 Jun 2016 23:39:00 +0200 Subject: [PATCH 25/28] update requirements: minimum versions of urllib3 and future --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 678e22607..d2fc6a44a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -future -urllib3 +future>=0.15.2 +urllib3>=1.8.3 certifi From 703bece155fd56c0ead2cf303369b67d6d965bea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Sun, 19 Jun 2016 23:40:34 +0200 Subject: [PATCH 26/28] set loglevel of urllib3 to WARNING by default --- telegram/utils/request.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/telegram/utils/request.py b/telegram/utils/request.py index f54264ce9..f801c54a3 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -20,6 +20,7 @@ import json import socket +import logging import certifi import urllib3 @@ -32,6 +33,8 @@ _CON_POOL = None """:type: urllib3.PoolManager""" CON_POOL_SIZE = 1 +logging.getLogger('urllib3').setLevel(logging.WARNING) + def _get_con_pool(): global _CON_POOL @@ -99,7 +102,7 @@ def _request_wrapper(*args, **kwargs): raise TimedOut() except urllib3.exceptions.HTTPError as error: # HTTPError must come last as its the base urllib3 exception class - # TODO: do something smart here; for now just raise NetowrkError + # TODO: do something smart here; for now just raise NetworkError raise NetworkError('urllib3 HTTPError {0}'.format(error)) if 200 <= resp.status <= 299: From 7635bc0eec7650c6a423f1977348a31005c9823f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Sun, 19 Jun 2016 23:46:34 +0200 Subject: [PATCH 27/28] comments, lock thread pool, while 1 and snake_case everywhere --- telegram/ext/dispatcher.py | 42 ++++++++++++++++++++------------------ telegram/ext/updater.py | 31 ++++++++++++++-------------- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index 31404501a..25298262b 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -36,7 +36,7 @@ logging.getLogger(__name__).addHandler(NullHandler()) ASYNC_QUEUE = Queue() ASYNC_THREADS = set() """:type: set[Thread]""" -ASYNC_LOCK = Lock() +ASYNC_LOCK = Lock() # guards ASYNC_THREADS DEFAULT_GROUP = 0 @@ -48,16 +48,17 @@ def _pooled(): try: func, args, kwargs = ASYNC_QUEUE.get() + # If unpacking fails, the thread pool is being closed from Updater._join_async_threads except TypeError: - logging.debug("Closing run_async thread %s/%d" % - (current_thread().getName(), len(ASYNC_THREADS))) + logging.getLogger(__name__).debug("Closing run_async thread %s/%d" % + (current_thread().getName(), len(ASYNC_THREADS))) break try: func(*args, **kwargs) except: - logging.exception("Async function raised exception") + logging.getLogger(__name__).exception("run_async function raised exception") def run_async(func): @@ -110,17 +111,18 @@ class Dispatcher(object): self.__stop_event = Event() self.__exception_event = exception_event or Event() - if not len(ASYNC_THREADS): - if request.is_con_pool_initialized(): - raise RuntimeError('Connection Pool already initialized') + with ASYNC_LOCK: + if not ASYNC_THREADS: + if request.is_con_pool_initialized(): + raise RuntimeError('Connection Pool already initialized') - request.CON_POOL_SIZE = workers + 3 - for i in range(workers): - thread = Thread(target=_pooled, name=str(i)) - ASYNC_THREADS.add(thread) - thread.start() - else: - self.logger.debug('Thread pool already initialized, skipping.') + request.CON_POOL_SIZE = workers + 3 + for i in range(workers): + thread = Thread(target=_pooled, name=str(i)) + ASYNC_THREADS.add(thread) + thread.start() + else: + self.logger.debug('Thread pool already initialized, skipping.') def start(self): """ @@ -140,7 +142,7 @@ class Dispatcher(object): self.running = True self.logger.debug('Dispatcher started') - while True: + while 1: try: # Pop update from update queue. update = self.update_queue.get(True, 1) @@ -154,7 +156,7 @@ class Dispatcher(object): continue self.logger.debug('Processing Update: %s' % update) - self.processUpdate(update) + self.process_update(update) self.running = False self.logger.debug('Dispatcher thread stopped') @@ -169,7 +171,7 @@ class Dispatcher(object): sleep(0.1) self.__stop_event.clear() - def processUpdate(self, update): + def process_update(self, update): """ Processes a single update. @@ -179,7 +181,7 @@ class Dispatcher(object): # An error happened while polling if isinstance(update, TelegramError): - self.dispatchError(None, update) + self.dispatch_error(None, update) else: for group in self.groups: @@ -194,7 +196,7 @@ class Dispatcher(object): 'Update.') try: - self.dispatchError(update, te) + self.dispatch_error(update, te) except Exception: self.logger.exception('An uncaught error was raised while ' 'handling the error') @@ -280,7 +282,7 @@ class Dispatcher(object): if callback in self.error_handlers: self.error_handlers.remove(callback) - def dispatchError(self, update, error): + def dispatch_error(self, update, error): """ Dispatches an error. diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 3fc99e43b..3c3a190c4 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -285,9 +285,10 @@ class Updater(object): def _check_ssl_cert(self, cert, key): # Check SSL-Certificate with openssl, if possible try: - exit_code = subprocess.call(["openssl", "x509", "-text", "-noout", "-in", cert], - stdout=open(os.devnull, 'wb'), - stderr=subprocess.STDOUT) + exit_code = subprocess.call( + ["openssl", "x509", "-text", "-noout", "-in", cert], + stdout=open(os.devnull, 'wb'), + stderr=subprocess.STDOUT) except OSError: exit_code = 0 if exit_code is 0: @@ -308,7 +309,7 @@ class Updater(object): def _bootstrap(self, max_retries, clean, webhook_url, cert=None): retries = 0 - while True: + while 1: try: if clean: @@ -353,9 +354,8 @@ class Updater(object): self._stop_httpd() self._stop_dispatcher() self._join_threads() - # async threads must be join()ed only after the dispatcher - # thread was joined, otherwise we can still have new async - # threads dispatched + # async threads must be join()ed only after the dispatcher thread was joined, + # otherwise we can still have new async threads dispatched self._join_async_threads() def _stop_httpd(self): @@ -373,16 +373,17 @@ class Updater(object): def _join_async_threads(self): with dispatcher.ASYNC_LOCK: threads = list(dispatcher.ASYNC_THREADS) - total = len(threads) + total = len(threads) - for i in range(total): - dispatcher.ASYNC_QUEUE.put(0) + # Stop all threads in the thread pool by put()ting one non-tuple per thread + for i in range(total): + dispatcher.ASYNC_QUEUE.put(None) - for i, thr in enumerate(threads): - self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total)) - thr.join() - dispatcher.ASYNC_THREADS.remove(thr) - self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total)) + for i, thr in enumerate(threads): + self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total)) + thr.join() + dispatcher.ASYNC_THREADS.remove(thr) + self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total)) def _join_threads(self): for thr in self.__threads: From b41f7e3e79ea9211275134ba0f30ddd50c5cafe3 Mon Sep 17 00:00:00 2001 From: leandrotoledo Date: Sun, 19 Jun 2016 17:50:02 -0400 Subject: [PATCH 28/28] Code style with latest yapf --- telegram/bot.py | 24 +++++++++++------------- telegram/inputfile.py | 6 +++--- tests/test_jobqueue.py | 1 - 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/telegram/bot.py b/telegram/bot.py index 16338bd69..2145bee79 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -618,14 +618,13 @@ class Bot(TelegramObject): @log @message - def sendVenue(self, - chat_id, - latitude, - longitude, - title, - address, - foursquare_id=None, - **kwargs): + def sendVenue( + self, chat_id, + latitude, + longitude, + title, address, + foursquare_id=None, + **kwargs): """ Use this method to send information about a venue. @@ -1133,11 +1132,10 @@ class Bot(TelegramObject): @log @message - def editMessageReplyMarkup(self, - chat_id=None, - message_id=None, - inline_message_id=None, - **kwargs): + def editMessageReplyMarkup( + self, chat_id=None, + message_id=None, inline_message_id=None, + **kwargs): """Use this method to edit only the reply markup of messages sent by the bot or via the bot (for inline bots). diff --git a/telegram/inputfile.py b/telegram/inputfile.py index 5ed3c28c9..8c9276eb5 100644 --- a/telegram/inputfile.py +++ b/telegram/inputfile.py @@ -125,9 +125,9 @@ class InputFile(object): # Add input_file to upload form.extend([ - form_boundary, 'Content-Disposition: form-data; name="%s"; filename="%s"' % - (self.input_name, self.filename), 'Content-Type: %s' % self.mimetype, '', - self.input_file_content + form_boundary, 'Content-Disposition: form-data; name="%s"; filename="%s"' % ( + self.input_name, self.filename + ), 'Content-Type: %s' % self.mimetype, '', self.input_file_content ]) form.append('--' + self.boundary + '--') diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 2d3678fcd..8cadd8e1d 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -1,4 +1,3 @@ - #!/usr/bin/env python # encoding: utf-8 #