python-telegram-bot/telegram/boteventhandler.py

229 lines
7.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
2015-11-21 16:04:06 +01:00
This module contains the class BotEventHandler, which tries to make creating
Telegram Bots intuitive!
"""
import logging
2015-11-16 13:05:57 +01:00
import ssl
from threading import Thread
2015-11-16 20:35:27 +01:00
from time import sleep
2015-11-22 01:03:29 +01:00
import subprocess
from telegram import (Bot, TelegramError, broadcaster, Broadcaster,
NullHandler)
2015-11-16 13:05:57 +01:00
from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler)
# Adjust for differences in Python versions
2015-11-16 13:05:57 +01:00
try:
from Queue import Queue
2015-11-16 13:05:57 +01:00
except ImportError:
from queue import Queue
2015-11-06 00:24:01 +01:00
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
2015-11-06 00:24:01 +01:00
class BotEventHandler:
"""
This class provides a frontend to telegram.Bot to the programmer, so they
2015-11-17 00:04:28 +01:00
can focus on coding the bot. It also runs in a separate thread, so the user
can interact with the bot, for example on the command line. It supports
Handlers for different kinds of data: Updates from Telegram, basic text
commands and even arbitrary types.
2015-11-17 00:04:28 +01:00
Polling as well as webhook are supported.
2015-11-17 15:57:22 +01:00
2015-11-21 16:04:06 +01:00
Attributes:
2015-11-17 15:57:22 +01:00
Args:
token (str): The bots token given by the @BotFather
base_url (Optional[str]):
broadcaster (Optional[Broadcaster]): Use present Broadcaster object. If
None, a new one will be created.
workers (Optional[int]): Amount of threads in the thread pool for
functions decorated with @run_async
"""
2015-11-17 15:57:22 +01:00
def __init__(self, token, base_url=None, broadcaster=None, workers=4):
if broadcaster:
self.bot = broadcaster.bot
self.update_queue = broadcaster.update_queue
self.broadcaster = broadcaster
else:
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.broadcaster = Broadcaster(self.bot, self.update_queue,
workers=workers)
self.last_update_id = 0
self.logger = logging.getLogger(__name__)
self.running = False
2015-11-16 13:05:57 +01:00
self.httpd = None
2015-11-16 13:05:57 +01:00
def start_polling(self, poll_interval=1.0, timeout=10, network_delay=2):
"""
2015-11-17 15:57:22 +01:00
Starts polling updates from Telegram.
Args:
2015-11-17 15:57:22 +01:00
poll_interval (Optional[float]): Time to wait between polling
updates from Telegram in seconds. Default is 1.0
timeout (Optional[float]): Passed to Bot.getUpdates
network_delay (Optional[float]): Passed to Bot.getUpdates
Returns:
Queue: The update queue that can be filled from the main thread
"""
# Create Thread objects
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
2015-11-16 13:05:57 +01:00
event_handler_thread = Thread(target=self._start_polling,
name="eventhandler",
args=(poll_interval, timeout,
network_delay))
self.running = True
2015-11-17 15:57:22 +01:00
# Start threads
broadcaster_thread.start()
event_handler_thread.start()
2015-11-17 15:57:22 +01:00
# Return the update queue so the main thread can insert updates
return self.update_queue
2015-11-16 13:05:57 +01:00
def start_webhook(self, host, port, cert, key, listen='0.0.0.0'):
"""
2015-11-17 00:04:28 +01:00
Starts a small http server to listen for updates via webhook
2015-11-16 13:05:57 +01:00
Args:
host (str): Hostname or IP of the bot
port (int): Port the bot should be listening on
cert (str): Path to the SSL certificate file
key (str): Path to the SSL key file
listen (Optional[str]): IP-Address to listen on
Returns:
Queue: The update queue that can be filled from the main thread
"""
# Create Thread objects
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
event_handler_thread = Thread(target=self._start_webhook,
name="eventhandler",
args=(host, port, cert, key, listen))
self.running = True
# Start threads
broadcaster_thread.start()
event_handler_thread.start()
# Return the update queue so the main thread can insert updates
return self.update_queue
def _start_polling(self, poll_interval, timeout, network_delay):
"""
Thread target of thread 'eventhandler'. Runs in background, pulls
updates from Telegram and inserts them in the update queue of the
Broadcaster.
"""
current_interval = poll_interval
self.logger.info('Event Handler thread started')
2015-11-16 13:05:57 +01:00
# Remove webhook
self.bot.setWebhook(webhook_url=None)
while self.running:
try:
updates = self.bot.getUpdates(self.last_update_id,
timeout=timeout,
network_delay=network_delay)
if not self.running:
if len(updates) > 0:
2015-11-16 20:43:35 +01:00
self.logger.info('Updates ignored and will be pulled '
'again on restart.')
break
for update in updates:
self.update_queue.put(update)
self.last_update_id = update.update_id + 1
current_interval = poll_interval
2015-11-17 15:57:22 +01:00
2015-11-05 16:01:08 +01:00
sleep(current_interval)
except TelegramError as te:
# Put the error into the update queue and let the Broadcaster
# broadcast it
self.update_queue.put(te)
sleep(current_interval)
# increase waiting times on subsequent errors up to 30secs
if current_interval < 30:
current_interval += current_interval / 2
if current_interval > 30:
current_interval = 30
self.logger.info('Event Handler thread stopped')
2015-11-16 13:05:57 +01:00
def _start_webhook(self, host, port, cert, key, listen):
self.logger.info('Event Handler thread started')
url_base = "https://%s:%d" % (host, port)
url_path = "/%s" % self.bot.token
# Remove webhook
self.bot.setWebhook(webhook_url=None)
# Set webhook
self.bot.setWebhook(webhook_url=url_base + url_path,
certificate=open(cert, 'rb'))
# Start server
self.httpd = WebhookServer((listen, port), WebhookHandler,
self.update_queue, url_path)
2015-11-22 01:03:29 +01:00
# Check SSL-Certificate with openssl, if possible
2015-11-21 21:21:09 +01:00
try:
2015-11-22 01:03:29 +01:00
exit_code = subprocess.call(["openssl", "x509", "-text", "-noout",
"-in", cert])
except OSError:
exit_code = 0
if exit_code is 0:
try:
self.httpd.socket = ssl.wrap_socket(self.httpd.socket,
certfile=cert,
keyfile=key,
server_side=True)
except ssl.SSLError as error:
self.logger.error(str(error))
return
self.httpd.serve_forever(poll_interval=1)
self.logger.info('Event Handler thread stopped')
2015-11-16 13:05:57 +01:00
def stop(self):
"""
Stops the polling thread and the broadcaster
"""
self.logger.info('Stopping Event Handler and Broadcaster...')
self.running = False
2015-11-16 13:05:57 +01:00
if self.httpd:
self.logger.info(
2015-11-16 20:35:27 +01:00
'Waiting for current webhook connection to be closed... '
'Send a Telegram message to the bot to exit immediately.')
2015-11-16 13:05:57 +01:00
self.httpd.shutdown()
2015-11-16 20:35:27 +01:00
self.httpd = None
2015-11-16 13:05:57 +01:00
2015-11-21 21:21:09 +01:00
self.logger.debug("Requesting Broadcaster to stop...")
self.broadcaster.stop()
while broadcaster.running_async > 0:
sleep(1)
2015-11-21 21:21:09 +01:00
self.logger.debug("Broadcaster stopped.")