mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-12-31 16:40:53 +01:00
Keep off some features still in progress
This commit is contained in:
parent
e693922fe8
commit
245238b3a2
5 changed files with 0 additions and 406 deletions
|
@ -1,174 +0,0 @@
|
||||||
from inspect import getmembers, ismethod
|
|
||||||
import threading
|
|
||||||
import logging
|
|
||||||
import telegram
|
|
||||||
import time
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
__all__ = ['CommandHandler', 'CommandHandlerWithHelp']
|
|
||||||
class CommandHandler(object):
|
|
||||||
""" This handles incomming commands and gives an easy way to create commands.
|
|
||||||
|
|
||||||
How to use this:
|
|
||||||
create a new class which inherits this class or CommandHandlerWithHelp.
|
|
||||||
define new methods that start with 'command_' and then the command_name.
|
|
||||||
run run()
|
|
||||||
"""
|
|
||||||
def __init__(self, bot):
|
|
||||||
self.bot = bot # a telegram bot
|
|
||||||
self.isValidCommand = None # a function that returns a boolean and takes one agrument an update. if False is returned the the comaand is not executed.
|
|
||||||
|
|
||||||
def _get_command_func(self, command):
|
|
||||||
if command[0] == '/':
|
|
||||||
command = command[1:]
|
|
||||||
if hasattr(self, 'command_' + command):
|
|
||||||
return self.__getattribute__('command_' + command) # a function
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def run(self, make_thread=True, last_update_id=None, thread_timeout=2, sleep=0.2):
|
|
||||||
"""Continuously check for commands and run the according method
|
|
||||||
|
|
||||||
Args:
|
|
||||||
make_thread:
|
|
||||||
if True make a thread for each command it found.
|
|
||||||
if False make run the code linearly
|
|
||||||
last_update:
|
|
||||||
the offset arg from getUpdates and is kept up to date within this function
|
|
||||||
thread_timeout:
|
|
||||||
The timeout on a thread. If a thread is alive after this period then try to join the thread in
|
|
||||||
the next loop.
|
|
||||||
"""
|
|
||||||
old_threads = []
|
|
||||||
while True:
|
|
||||||
time.sleep(sleep)
|
|
||||||
threads, last_update_id = self.run_once(make_thread=make_thread, last_update_id=last_update_id)
|
|
||||||
for t in threads:
|
|
||||||
t.start()
|
|
||||||
for t in old_threads:
|
|
||||||
threads.append(t)
|
|
||||||
old_threads = []
|
|
||||||
for t in threads:
|
|
||||||
t.join(timeout=thread_timeout)
|
|
||||||
if t.isAlive():
|
|
||||||
old_threads.append(t)
|
|
||||||
|
|
||||||
def run_once(self, make_thread=True, last_update_id=None):
|
|
||||||
""" Check the the messages for commands and make a Thread with the command or run the command depending on make_thread.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
make_thread:
|
|
||||||
True: the function returns a list with threads. Which didn't start yet.
|
|
||||||
False: the function just runs the command it found and returns an empty list.
|
|
||||||
last_update_id:
|
|
||||||
the offset arg from getUpdates and is kept up to date within this function
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A tuple of two elements. The first element is a list with threads which didn't start yet or an empty list if
|
|
||||||
make_threads==False. The second element is the updated las_update_id
|
|
||||||
"""
|
|
||||||
bot_name = self.bot.getMe().username
|
|
||||||
threads = []
|
|
||||||
try:
|
|
||||||
updates = self.bot.getUpdates(offset=last_update_id)
|
|
||||||
except:
|
|
||||||
updates = []
|
|
||||||
for update in updates:
|
|
||||||
last_update_id = update.update_id + 1
|
|
||||||
message = update.message
|
|
||||||
if message.text[0] == '/':
|
|
||||||
command, username = message.text.split(' ')[0], bot_name
|
|
||||||
if '@' in command:
|
|
||||||
command, username = command.split('@')
|
|
||||||
if username == bot_name:
|
|
||||||
command_func = self._get_command_func(command)
|
|
||||||
if command_func is not None:
|
|
||||||
self.bot.sendChatAction(update.message.chat.id,telegram.ChatAction.TYPING)
|
|
||||||
if self.isValidCommand is None or self.isValidCommand(update):
|
|
||||||
if make_thread:
|
|
||||||
t = threading.Thread(target=command_func, args=(update,))
|
|
||||||
threads.append(t)
|
|
||||||
else:
|
|
||||||
command_func(update)
|
|
||||||
else:
|
|
||||||
self._command_not_found(update) # TODO this must be another function.
|
|
||||||
else:
|
|
||||||
if make_thread:
|
|
||||||
t = threading.Thread(target=self._command_not_found, args=(update,))
|
|
||||||
threads.append(t)
|
|
||||||
else:
|
|
||||||
self._command_not_valid(update)
|
|
||||||
return threads, last_update_id
|
|
||||||
|
|
||||||
def _command_not_valid(self, update):
|
|
||||||
"""Inform the telegram user that the command was not found.
|
|
||||||
|
|
||||||
Override this method if you want to do it another way then by sending the the text:
|
|
||||||
Sorry, I didn't understand the command: /command[@bot].
|
|
||||||
"""
|
|
||||||
chat_id = update.message.chat.id
|
|
||||||
reply_to = update.message.message_id
|
|
||||||
message = "Sorry, the command was not authorised or valid: {command}.".format(command=update.message.text.split(' ')[0])
|
|
||||||
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
|
|
||||||
|
|
||||||
def _command_not_found(self, update):
|
|
||||||
"""Inform the telegram user that the command was not found.
|
|
||||||
|
|
||||||
Override this method if you want to do it another way then by sending the the text:
|
|
||||||
Sorry, I didn't understand the command: /command[@bot].
|
|
||||||
"""
|
|
||||||
chat_id = update.message.chat.id
|
|
||||||
reply_to = update.message.message_id
|
|
||||||
message = "Sorry, I didn't understand the command: {command}.".format(command=update.message.text.split(' ')[0])
|
|
||||||
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
|
|
||||||
|
|
||||||
|
|
||||||
class CommandHandlerWithHelp(CommandHandler):
|
|
||||||
""" This CommandHandler has a builtin /help. It grabs the text from the docstrings of command_ functions."""
|
|
||||||
def __init__(self, bot):
|
|
||||||
super(CommandHandlerWithHelp, self).__init__(bot)
|
|
||||||
self._help_title = 'Welcome to {name}.'.format(name=self.bot.getMe().username) # the title of help
|
|
||||||
self._help_before_list = '' # text with information about the bot
|
|
||||||
self._help_after_list = '' # a footer
|
|
||||||
self._help_list_title = 'These are the commands:' # the title of the list
|
|
||||||
self.is_reply = True
|
|
||||||
self.command_start = self.command_help
|
|
||||||
|
|
||||||
def _generate_help(self):
|
|
||||||
""" Generate a string which can be send as a help file.
|
|
||||||
|
|
||||||
This function generates a help file from all the docstrings from the commands.
|
|
||||||
so docstrings of methods that start with command_ should explain what a command does and how a to use the
|
|
||||||
command to the telegram user.
|
|
||||||
"""
|
|
||||||
|
|
||||||
command_functions = [attr[1] for attr in getmembers(self, predicate=ismethod) if attr[0][:8] == 'command_']
|
|
||||||
help_message = self._help_title + '\n\n'
|
|
||||||
help_message += self._help_before_list + '\n\n'
|
|
||||||
help_message += self._help_list_title + '\n'
|
|
||||||
for command_function in command_functions:
|
|
||||||
if command_function.__doc__ is not None:
|
|
||||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + command_function.__doc__ + '\n'
|
|
||||||
else:
|
|
||||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + '\n'
|
|
||||||
help_message += '\n'
|
|
||||||
help_message += self._help_after_list
|
|
||||||
return help_message
|
|
||||||
|
|
||||||
def _command_not_found(self, update):
|
|
||||||
"""Inform the telegram user that the command was not found."""
|
|
||||||
chat_id = update.message.chat.id
|
|
||||||
reply_to = update.message.message_id
|
|
||||||
message = 'Sorry, I did not understand the command: {command}. Please see /help for all available commands'
|
|
||||||
if self.is_reply:
|
|
||||||
self.bot.sendMessage(chat_id, message.format(command=update.message.text.split(' ')[0]),
|
|
||||||
reply_to_message_id=reply_to)
|
|
||||||
else:
|
|
||||||
self.bot.sendMessage(chat_id, message.format(command=update.message.text.split(' ')[0]))
|
|
||||||
|
|
||||||
def command_help(self, update):
|
|
||||||
""" The help file. """
|
|
||||||
chat_id = update.message.chat.id
|
|
||||||
reply_to = update.message.message_id
|
|
||||||
message = self._generate_help()
|
|
||||||
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
|
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
import telegram
|
|
||||||
|
|
||||||
|
|
||||||
class NoSuchCommandException(BaseException):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class CommandDispatcher:
|
|
||||||
def __init__(self,):
|
|
||||||
self.commands = list()
|
|
||||||
self.default = None
|
|
||||||
|
|
||||||
def addCommand(self, command, callback):
|
|
||||||
self.commands.append((command, callback))
|
|
||||||
|
|
||||||
def setDefault(self, callback):
|
|
||||||
self.default = callback
|
|
||||||
|
|
||||||
def dispatch(self, update):
|
|
||||||
if hasattr(update.message, 'text'):
|
|
||||||
text = update.message.text
|
|
||||||
else:
|
|
||||||
text = ''
|
|
||||||
|
|
||||||
user_id = update.message.from_user.id
|
|
||||||
com = text.split('@')[0]
|
|
||||||
for command, callback in self.commands:
|
|
||||||
if com == command:
|
|
||||||
return callback(command, user_id)
|
|
||||||
if self.default is not None:
|
|
||||||
return self.default(text, user_id)
|
|
||||||
else:
|
|
||||||
raise NoSuchCommandException()
|
|
||||||
|
|
||||||
|
|
||||||
class EnhancedBot(telegram.Bot):
|
|
||||||
"""The Bot class with command dispatcher added.
|
|
||||||
|
|
||||||
>>> bot = EnhancedBot(token=TOKEN)
|
|
||||||
>>> @bot.command('/start')
|
|
||||||
... def start(command, user_id):
|
|
||||||
... # should return a tuple: (text, reply_id, custom_keyboard)
|
|
||||||
... return ("Hello, there! Your id is {}".format(user_id), None, None)
|
|
||||||
>>> while True:
|
|
||||||
... bot.processUpdates()
|
|
||||||
... time.sleep(3)
|
|
||||||
"""
|
|
||||||
def __init__(self, token):
|
|
||||||
self.dispatcher = CommandDispatcher()
|
|
||||||
telegram.Bot.__init__(self, token=token)
|
|
||||||
self.offset = 0 #id of the last processed update
|
|
||||||
|
|
||||||
def command(self, *names, default=False):
|
|
||||||
"""Decorator for adding callbacks for commands."""
|
|
||||||
|
|
||||||
def inner_command(callback):
|
|
||||||
for name in names:
|
|
||||||
self.dispatcher.addCommand(name, callback)
|
|
||||||
if default:
|
|
||||||
self.dispatcher.setDefault(callback)
|
|
||||||
return callback # doesn't touch the callback, so we can use it
|
|
||||||
return inner_command
|
|
||||||
|
|
||||||
def processUpdates(self):
|
|
||||||
updates = self.getUpdates(offset=self.offset)
|
|
||||||
|
|
||||||
for update in updates:
|
|
||||||
print('processing update: {}'.format(str(update.to_dict())))
|
|
||||||
self.offset = update.update_id + 1
|
|
||||||
if not hasattr(update, 'message'):
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
answer, reply_to, reply_markup = self.dispatcher.dispatch(update)
|
|
||||||
except Exception as e:
|
|
||||||
print('error occured') # TODO logging
|
|
||||||
print(update.to_dict())
|
|
||||||
raise e
|
|
||||||
|
|
||||||
if answer is not None:
|
|
||||||
self.sendMessage(chat_id=update.message.chat_id,
|
|
||||||
text=answer,
|
|
||||||
reply_to_message_id=reply_to,
|
|
||||||
reply_markup=reply_markup)
|
|
|
@ -1,74 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
#
|
|
||||||
# A library that provides a Python interface to the Telegram Bot API
|
|
||||||
# Copyright (C) 2015 Leandro Toledo de Souza <leandrotoeldodesouza@gmail.com>
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Lesser Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU Lesser Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU Lesser Public License
|
|
||||||
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
|
||||||
|
|
||||||
import json
|
|
||||||
try:
|
|
||||||
from urllib.parse import urlencode
|
|
||||||
from urllib.request import urlopen, Request
|
|
||||||
from urllib.error import HTTPError, URLError
|
|
||||||
except ImportError:
|
|
||||||
from urllib import urlencode
|
|
||||||
from urllib2 import urlopen, Request
|
|
||||||
from urllib2 import HTTPError, URLError
|
|
||||||
|
|
||||||
DEFAULT_BASE_URL = \
|
|
||||||
'https://api.botan.io/track?token=%(token)&uid=%(uid)&name=%(name)'
|
|
||||||
USER_AGENT = 'Python Telegram Bot' \
|
|
||||||
' (https://github.com/leandrotoledo/python-telegram-bot)'
|
|
||||||
CONTENT_TYPE = 'application/json'
|
|
||||||
|
|
||||||
class Botan(Object):
|
|
||||||
def __init__(self,
|
|
||||||
token,
|
|
||||||
base_url=None):
|
|
||||||
self.token = token
|
|
||||||
|
|
||||||
if base_url is None:
|
|
||||||
self.base_url = DEFAULT_BASE_URL % {'token': self.token}
|
|
||||||
else:
|
|
||||||
self.base_url = base_url % {'token': self.token}
|
|
||||||
|
|
||||||
def track(self,
|
|
||||||
uid,
|
|
||||||
text,
|
|
||||||
name = 'Message'):
|
|
||||||
|
|
||||||
url = self.base_url % {'uid': uid,
|
|
||||||
'message': text,
|
|
||||||
'name': name}
|
|
||||||
|
|
||||||
self._post(url, message)
|
|
||||||
|
|
||||||
def _post(self,
|
|
||||||
url,
|
|
||||||
data):
|
|
||||||
headers = {'User-agent': USER_AGENT,
|
|
||||||
'Content-type': CONTENT_TYPE}
|
|
||||||
|
|
||||||
try:
|
|
||||||
request = Request(
|
|
||||||
url,
|
|
||||||
data=urlencode(data).encode(),
|
|
||||||
headers=headers
|
|
||||||
)
|
|
||||||
|
|
||||||
return urlopen(request).read()
|
|
||||||
except IOError as e:
|
|
||||||
raise TelegramError(str(e))
|
|
||||||
except HTTPError as e:
|
|
||||||
raise TelegramError(str(e))
|
|
|
@ -1,75 +0,0 @@
|
||||||
import sys
|
|
||||||
sys.path.append('.')
|
|
||||||
import telegram
|
|
||||||
import unittest
|
|
||||||
import unittest.mock
|
|
||||||
from telegram.command_handler import CommandHandler, CommandHandlerWithHelp
|
|
||||||
|
|
||||||
|
|
||||||
class CommandHandlerTmp(CommandHandler):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(CommandHandlerTmp, self).__init__(*args, **kwargs)
|
|
||||||
self.output = None
|
|
||||||
|
|
||||||
def command_test(self, update):
|
|
||||||
self.output = 1
|
|
||||||
|
|
||||||
|
|
||||||
class CommandHandlerTmp2(CommandHandlerWithHelp):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(CommandHandlerTmp2, self).__init__(*args, **kwargs)
|
|
||||||
self.output_test = None
|
|
||||||
|
|
||||||
def command_test(self, update):
|
|
||||||
self.output_test = 1
|
|
||||||
|
|
||||||
|
|
||||||
def fake_getUpdates(*args, **kwargs):
|
|
||||||
from_user = telegram.User(id=42, first_name='hello')
|
|
||||||
message = telegram.Message(message_id=42, from_user=from_user, date=None, chat=from_user, text='/test')
|
|
||||||
update = telegram.Update(update_id=42, message=message)
|
|
||||||
return [update]
|
|
||||||
|
|
||||||
output_fsm = None
|
|
||||||
|
|
||||||
|
|
||||||
def fake_sendMessage(chat_id, message, *args, **kwargs):
|
|
||||||
global output_fsm
|
|
||||||
output_fsm = (chat_id, message)
|
|
||||||
return telegram.Message(43, 123, 000000, telegram.User(chat_id, 'test'), text=message)
|
|
||||||
|
|
||||||
|
|
||||||
class CommandHandlerTest(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.bot = unittest.mock.MagicMock()
|
|
||||||
self.bot.getUpdates = fake_getUpdates
|
|
||||||
self.bot.sendMessage = fake_sendMessage
|
|
||||||
|
|
||||||
def test_get_command_func(self):
|
|
||||||
CH = CommandHandlerTmp(self.bot)
|
|
||||||
self.assertEqual(CH.command_test, CH._get_command_func('test'))
|
|
||||||
self.assertEqual(CH.command_test, CH._get_command_func('/test'))
|
|
||||||
self.assertEqual(None, CH._get_command_func('this function does not exsist'))
|
|
||||||
|
|
||||||
def test_run_once(self):
|
|
||||||
CH = CommandHandlerTmp(self.bot)
|
|
||||||
self.assertEqual(CH.output, None)
|
|
||||||
threads, last_update = CH.run_once(make_thread=True)
|
|
||||||
for t in threads:
|
|
||||||
t.start()
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
self.assertEqual(CH.output, 1)
|
|
||||||
|
|
||||||
def test_run(self):
|
|
||||||
pass # TODO implement test
|
|
||||||
|
|
||||||
def test__command_not_found(self):
|
|
||||||
CH = CommandHandlerTmp(self.bot)
|
|
||||||
CH._command_not_found(self.bot.getUpdates()[0])
|
|
||||||
self.assertEqual(output_fsm, (42, "Sorry, I didn't understand the command: /test."))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
import sys
|
|
||||||
unittest.main(sys.argv)
|
|
Loading…
Reference in a new issue