Merge branch 'testing'

This commit is contained in:
leandrotoledo 2015-09-04 23:20:31 -03:00
commit 8f0447156b
8 changed files with 497 additions and 4 deletions

View file

@ -0,0 +1,89 @@
# There could be some unused imports
from inspect import getmembers, ismethod
import threading
import logging
import telegram
import time
from telegram import CommandHandlerWithHelp, CommandHandler
class ExampleCommandHandler(CommandHandlerWithHelp):
"""This is an example how to use a CommandHandlerWithHelp or just a CommandHandler.
If You want to use a CommandHandler it is very easy.
create a class which inherits a CommandHandler.
create a method in this class which start with 'command_' and takes 1 argument: 'update' (which comes directly from
getUpdate()).
If you inherit CommandHandlerWithHelp it also creates a nice /help for you.
"""
def __init__(self, bot): # only necessary for a WithHelp
super(ExampleCommandHandler, self).__init__(bot)
self._help_title = 'Welcome this is a help file!' # optional
self._help_before_list = """
Yeah here I explain some things about this bot.
and of course I can do this in Multiple lines.
""" # default is empty
self._help_list_title = ' These are the available commands:' # optional
self._help_after_list = ' These are some footnotes' # default is empty
self.is_reply = True # default is True
# only necessary if you want to override to default
def _command_not_found(self, update):
"""Inform the telegram user that the command was not found."""
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = "Sorry, I don't know how to do {command}.".format(command=update.message.text.split(' ')[0])
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
# creates /test command. This code gets called when a telegram user enters /test
def command_test(self, update):
""" Test if the server is online. """
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = 'Yeah, the server is online!'
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
# creates /parrot command
def command_parrot(self, update):
""" Says back what you say after the command"""
chat_id = update.message.chat.id
reply_to = update.message.message_id
send = update.message.text.split(' ')
message = update.message.text[len(send[0]):]
if len(send) == 1:
message = '...'
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
# creates /p command
def command_p(self, update):
"""Does the same as parrot."""
return self.command_parrot(update)
# this doesn't create a command.
def another_test(self, update):
""" This won't be called by the CommandHandler.
This is an example of a function that isn't a command in telegram.
Because it didn't start with 'command_'.
"""
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = 'Yeah, this is another test'
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
class Exampe2CommandHandler(CommandHandler):
"""
This is an example of a small working CommandHandler with only one command.
"""
def command_test(self, update):
""" Test if the server is online. """
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = 'Yeah, the server is online!'
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
if __name__ == '__main__':
import telegram
token = '' # use your own token here
Bot = telegram.Bot(token=token)
test_command_handler = ExampleCommandHandler(Bot)
test_command_handler.run()

174
telegram/command_handler.py Normal file
View file

@ -0,0 +1,174 @@
from inspect import getmembers, ismethod
import threading
import logging
import telegram
import time
logger = logging.getLogger(__name__)
__all__ = ['CommandHandler', 'CommandHandlerWithHelp']
class CommandHandler(object):
""" This handles incomming commands and gives an easy way to create commands.
How to use this:
create a new class which inherits this class or CommandHandlerWithHelp.
define new methods that start with 'command_' and then the command_name.
run run()
"""
def __init__(self, bot):
self.bot = bot # a telegram bot
self.isValidCommand = None # a function that returns a boolean and takes one agrument an update. if False is returned the the comaand is not executed.
def _get_command_func(self, command):
if command[0] == '/':
command = command[1:]
if hasattr(self, 'command_' + command):
return self.__getattribute__('command_' + command) # a function
else:
return None
def run(self, make_thread=True, last_update_id=None, thread_timeout=2, sleep=0.2):
"""Continuously check for commands and run the according method
Args:
make_thread:
if True make a thread for each command it found.
if False make run the code linearly
last_update:
the offset arg from getUpdates and is kept up to date within this function
thread_timeout:
The timeout on a thread. If a thread is alive after this period then try to join the thread in
the next loop.
"""
old_threads = []
while True:
time.sleep(sleep)
threads, last_update_id = self.run_once(make_thread=make_thread, last_update_id=last_update_id)
for t in threads:
t.start()
for t in old_threads:
threads.append(t)
old_threads = []
for t in threads:
t.join(timeout=thread_timeout)
if t.isAlive():
old_threads.append(t)
def run_once(self, make_thread=True, last_update_id=None):
""" Check the the messages for commands and make a Thread with the command or run the command depending on make_thread.
Args:
make_thread:
True: the function returns a list with threads. Which didn't start yet.
False: the function just runs the command it found and returns an empty list.
last_update_id:
the offset arg from getUpdates and is kept up to date within this function
Returns:
A tuple of two elements. The first element is a list with threads which didn't start yet or an empty list if
make_threads==False. The second element is the updated las_update_id
"""
bot_name = self.bot.getMe().username
threads = []
try:
updates = self.bot.getUpdates(offset=last_update_id)
except:
updates = []
for update in updates:
last_update_id = update.update_id + 1
message = update.message
if message.text[0] == '/':
command, username = message.text.split(' ')[0], bot_name
if '@' in command:
command, username = command.split('@')
if username == bot_name:
command_func = self._get_command_func(command)
if command_func is not None:
self.bot.sendChatAction(update.message.chat.id,telegram.ChatAction.TYPING)
if self.isValidCommand is None or self.isValidCommand(update):
if make_thread:
t = threading.Thread(target=command_func, args=(update,))
threads.append(t)
else:
command_func(update)
else:
self._command_not_found(update) # TODO this must be another function.
else:
if make_thread:
t = threading.Thread(target=self._command_not_found, args=(update,))
threads.append(t)
else:
self._command_not_valid(update)
return threads, last_update_id
def _command_not_valid(self, update):
"""Inform the telegram user that the command was not found.
Override this method if you want to do it another way then by sending the the text:
Sorry, I didn't understand the command: /command[@bot].
"""
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = "Sorry, the command was not authorised or valid: {command}.".format(command=update.message.text.split(' ')[0])
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
def _command_not_found(self, update):
"""Inform the telegram user that the command was not found.
Override this method if you want to do it another way then by sending the the text:
Sorry, I didn't understand the command: /command[@bot].
"""
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = "Sorry, I didn't understand the command: {command}.".format(command=update.message.text.split(' ')[0])
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)
class CommandHandlerWithHelp(CommandHandler):
""" This CommandHandler has a builtin /help. It grabs the text from the docstrings of command_ functions."""
def __init__(self, bot):
super(CommandHandlerWithHelp, self).__init__(bot)
self._help_title = 'Welcome to {name}.'.format(name=self.bot.getMe().username) # the title of help
self._help_before_list = '' # text with information about the bot
self._help_after_list = '' # a footer
self._help_list_title = 'These are the commands:' # the title of the list
self.is_reply = True
self.command_start = self.command_help
def _generate_help(self):
""" Generate a string which can be send as a help file.
This function generates a help file from all the docstrings from the commands.
so docstrings of methods that start with command_ should explain what a command does and how a to use the
command to the telegram user.
"""
command_functions = [attr[1] for attr in getmembers(self, predicate=ismethod) if attr[0][:8] == 'command_']
help_message = self._help_title + '\n\n'
help_message += self._help_before_list + '\n\n'
help_message += self._help_list_title + '\n'
for command_function in command_functions:
if command_function.__doc__ is not None:
help_message += ' /' + command_function.__name__[8:] + ' - ' + command_function.__doc__ + '\n'
else:
help_message += ' /' + command_function.__name__[8:] + ' - ' + '\n'
help_message += '\n'
help_message += self._help_after_list
return help_message
def _command_not_found(self, update):
"""Inform the telegram user that the command was not found."""
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = 'Sorry, I did not understand the command: {command}. Please see /help for all available commands'
if self.is_reply:
self.bot.sendMessage(chat_id, message.format(command=update.message.text.split(' ')[0]),
reply_to_message_id=reply_to)
else:
self.bot.sendMessage(chat_id, message.format(command=update.message.text.split(' ')[0]))
def command_help(self, update):
""" The help file. """
chat_id = update.message.chat.id
reply_to = update.message.message_id
message = self._generate_help()
self.bot.sendMessage(chat_id, message, reply_to_message_id=reply_to)

83
telegram/enchancedbot.py Normal file
View file

@ -0,0 +1,83 @@
import telegram
class NoSuchCommandException(BaseException):
pass
class CommandDispatcher:
def __init__(self,):
self.commands = list()
self.default = None
def addCommand(self, command, callback):
self.commands.append((command, callback))
def setDefault(self, callback):
self.default = callback
def dispatch(self, update):
if hasattr(update.message, 'text'):
text = update.message.text
else:
text = ''
user_id = update.message.from_user.id
com = text.split('@')[0]
for command, callback in self.commands:
if com == command:
return callback(command, user_id)
if self.default is not None:
return self.default(text, user_id)
else:
raise NoSuchCommandException()
class EnhancedBot(telegram.Bot):
"""The Bot class with command dispatcher added.
>>> bot = EnhancedBot(token=TOKEN)
>>> @bot.command('/start')
... def start(command, user_id):
... # should return a tuple: (text, reply_id, custom_keyboard)
... return ("Hello, there! Your id is {}".format(user_id), None, None)
>>> while True:
... bot.processUpdates()
... time.sleep(3)
"""
def __init__(self, token):
self.dispatcher = CommandDispatcher()
telegram.Bot.__init__(self, token=token)
self.offset = 0 #id of the last processed update
def command(self, *names, default=False):
"""Decorator for adding callbacks for commands."""
def inner_command(callback):
for name in names:
self.dispatcher.addCommand(name, callback)
if default:
self.dispatcher.setDefault(callback)
return callback # doesn't touch the callback, so we can use it
return inner_command
def processUpdates(self):
updates = self.getUpdates(offset=self.offset)
for update in updates:
print('processing update: {}'.format(str(update.to_dict())))
self.offset = update.update_id + 1
if not hasattr(update, 'message'):
continue
try:
answer, reply_to, reply_markup = self.dispatcher.dispatch(update)
except Exception as e:
print('error occured') # TODO logging
print(update.to_dict())
raise e
if answer is not None:
self.sendMessage(chat_id=update.message.chat_id,
text=answer,
reply_to_message_id=reply_to,
reply_markup=reply_markup)

View file

@ -171,7 +171,8 @@ class InputFile(object):
bool
"""
if data:
file_types = ['audio', 'document', 'photo', 'video', 'voice']
file_types = ['audio', 'document', 'photo', 'video', 'voice',
'certificate']
file_type = [i for i in list(data.keys()) if i in file_types]
if file_type:

View file

@ -27,6 +27,3 @@ class ReplyMarkup(TelegramObject):
@staticmethod
def de_json(data):
pass
def to_dict(self):
pass

View file

74
telegram/utils/botan.py Normal file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015 Leandro Toledo de Souza <leandrotoeldodesouza@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import json
try:
from urllib.parse import urlencode
from urllib.request import urlopen, Request
from urllib.error import HTTPError, URLError
except ImportError:
from urllib import urlencode
from urllib2 import urlopen, Request
from urllib2 import HTTPError, URLError
DEFAULT_BASE_URL = \
'https://api.botan.io/track?token=%(token)&uid=%(uid)&name=%(name)'
USER_AGENT = 'Python Telegram Bot' \
' (https://github.com/leandrotoledo/python-telegram-bot)'
CONTENT_TYPE = 'application/json'
class Botan(Object):
def __init__(self,
token,
base_url=None):
self.token = token
if base_url is None:
self.base_url = DEFAULT_BASE_URL % {'token': self.token}
else:
self.base_url = base_url % {'token': self.token}
def track(self,
uid,
text,
name = 'Message'):
url = self.base_url % {'uid': uid,
'message': text,
'name': name}
self._post(url, message)
def _post(self,
url,
data):
headers = {'User-agent': USER_AGENT,
'Content-type': CONTENT_TYPE}
try:
request = Request(
url,
data=urlencode(data).encode(),
headers=headers
)
return urlopen(request).read()
except IOError as e:
raise TelegramError(str(e))
except HTTPError as e:
raise TelegramError(str(e))

View file

@ -0,0 +1,75 @@
import sys
sys.path.append('.')
import telegram
import unittest
import unittest.mock
from telegram.command_handler import CommandHandler, CommandHandlerWithHelp
class CommandHandlerTmp(CommandHandler):
def __init__(self, *args, **kwargs):
super(CommandHandlerTmp, self).__init__(*args, **kwargs)
self.output = None
def command_test(self, update):
self.output = 1
class CommandHandlerTmp2(CommandHandlerWithHelp):
def __init__(self, *args, **kwargs):
super(CommandHandlerTmp2, self).__init__(*args, **kwargs)
self.output_test = None
def command_test(self, update):
self.output_test = 1
def fake_getUpdates(*args, **kwargs):
from_user = telegram.User(id=42, first_name='hello')
message = telegram.Message(message_id=42, from_user=from_user, date=None, chat=from_user, text='/test')
update = telegram.Update(update_id=42, message=message)
return [update]
output_fsm = None
def fake_sendMessage(chat_id, message, *args, **kwargs):
global output_fsm
output_fsm = (chat_id, message)
return telegram.Message(43, 123, 000000, telegram.User(chat_id, 'test'), text=message)
class CommandHandlerTest(unittest.TestCase):
def setUp(self):
self.bot = unittest.mock.MagicMock()
self.bot.getUpdates = fake_getUpdates
self.bot.sendMessage = fake_sendMessage
def test_get_command_func(self):
CH = CommandHandlerTmp(self.bot)
self.assertEqual(CH.command_test, CH._get_command_func('test'))
self.assertEqual(CH.command_test, CH._get_command_func('/test'))
self.assertEqual(None, CH._get_command_func('this function does not exsist'))
def test_run_once(self):
CH = CommandHandlerTmp(self.bot)
self.assertEqual(CH.output, None)
threads, last_update = CH.run_once(make_thread=True)
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(CH.output, 1)
def test_run(self):
pass # TODO implement test
def test__command_not_found(self):
CH = CommandHandlerTmp(self.bot)
CH._command_not_found(self.bot.getUpdates()[0])
self.assertEqual(output_fsm, (42, "Sorry, I didn't understand the command: /test."))
if __name__ == '__main__':
import sys
unittest.main(sys.argv)