mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-12-22 14:35:00 +01:00
Revert "Feature/requests"
This commit is contained in:
parent
026673dc05
commit
f623db06ea
13 changed files with 579 additions and 124 deletions
19
CHANGES.rst
19
CHANGES.rst
|
@ -1,22 +1,3 @@
|
|||
2015-09-10
|
||||
Released 2.8.3
|
||||
Moved Bot._requestURL to its own class (telegram.utils.request)
|
||||
Much better, such wow, Telegram Objects tests
|
||||
Add consistency for str properties on Telegram Objects
|
||||
Better design to test if chat_id is invalid
|
||||
Add ability to set custom filename on Bot.sendDocument(..,filename='')
|
||||
Fix Sticker as InputFile
|
||||
Send JSON requests over urlencoded post data
|
||||
Markdown support for Bot.sendMessage(..., parse_mode=ParseMode.MARKDOWN)
|
||||
Refactor of TelegramError class (no more handling IOError or URLError)
|
||||
|
||||
|
||||
2015-09-05
|
||||
Released 2.8.2
|
||||
Fix regression on Telegram ReplyMarkup
|
||||
Add certificate to is_inputfile method
|
||||
|
||||
|
||||
2015-09-05
|
||||
Released 2.8.1
|
||||
Fix regression on Telegram objects with thumb properties
|
||||
|
|
|
@ -268,9 +268,9 @@ You may copy, distribute and modify the software provided that modifications are
|
|||
_`Contact`
|
||||
==========
|
||||
|
||||
Feel free to join to our `Telegram group <https://telegram.me/joinchat/00b9c0f802509b946b2e8e98b73e19be>`_.
|
||||
Feel free to join to our `Telegram group <https://telegram.me/joinchat/00b9c0f802509b94d52953d3fa1ec504>`_.
|
||||
|
||||
If you face trouble joining in the group please ping me `via Telegram <https://telegram.me/leandrotoledo>`_, I'll be glad to add you.
|
||||
*If you face trouble joining in the group please ping me on Telegram (@leandrotoledo), I'll be glad to add you.*
|
||||
|
||||
=======
|
||||
_`TODO`
|
||||
|
|
|
@ -60,7 +60,7 @@ author = u'Leandro Toledo'
|
|||
# The short X.Y version.
|
||||
version = '2.8'
|
||||
# The full version, including alpha/beta/rc tags.
|
||||
release = '2.8.3'
|
||||
release = '2.8.1'
|
||||
|
||||
# The language for content autogenerated by Sphinx. Refer to documentation
|
||||
# for a list of supported languages.
|
||||
|
|
87
examples/command_handler_example.py
Normal file
87
examples/command_handler_example.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
from telegram import CommandHandlerWithHelpAndFather, CommandHandler
|
||||
class ExampleCommandHandler(CommandHandlerWithHelpAndFather):
|
||||
"""This is an example how to use a CommandHandlerWithHelp or just a CommandHandler.
|
||||
|
||||
If You want to use a CommandHandler it is very easy.
|
||||
create a class which inherits a CommandHandler.
|
||||
create a method in this class which start with 'command_' and takes 1 argument: 'update' (which comes directly from
|
||||
getUpdate()).
|
||||
If you inherit CommandHandlerWithHelp it also creates a nice /help for you.
|
||||
"""
|
||||
def __init__(self, bot): # only necessary for a WithHelp
|
||||
super(ExampleCommandHandler, self).__init__(bot)
|
||||
self._help_title = 'Welcome this is a help file!' # optional
|
||||
self._help_before_list = """
|
||||
Yeah here I explain some things about this bot.
|
||||
and of course I can do this in Multiple lines.
|
||||
""" # default is empty
|
||||
self._help_list_title = ' These are the available commands:' # optional
|
||||
self._help_after_list = ' These are some footnotes' # default is empty
|
||||
self.is_reply = True # default is True
|
||||
|
||||
# only necessary if you want to override to default
|
||||
def _command_not_found(self, update):
|
||||
"""Inform the telegram user that the command was not found."""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = "Sorry, I don't know how to do {command}.".format(command=update.message.text.split(' ')[0])
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
# creates /test command. This code gets called when a telegram user enters /test
|
||||
def command_test(self, update):
|
||||
""" Test if the server is online. """
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = 'Yeah, the server is online!'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
# creates /parrot command
|
||||
def command_parrot(self, update):
|
||||
""" Says back what you say after the command"""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
send = update.message.text.split(' ')
|
||||
message = update.message.text[len(send[0]):]
|
||||
if len(send) == 1:
|
||||
message = '...'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
# creates /p command
|
||||
def command_p(self, update):
|
||||
"""Does the same as parrot."""
|
||||
return self.command_parrot(update)
|
||||
|
||||
# this doesn't create a command.
|
||||
def another_test(self, update):
|
||||
""" This won't be called by the CommandHandler.
|
||||
|
||||
This is an example of a function that isn't a command in telegram.
|
||||
Because it didn't start with 'command_'.
|
||||
"""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = 'Yeah, this is another test'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
|
||||
class Exampe2CommandHandler(CommandHandler):
|
||||
"""
|
||||
This is an example of a small working CommandHandler with only one command.
|
||||
"""
|
||||
def command_test(self, update):
|
||||
""" Test if the server is online. """
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = 'Yeah, the server is online!'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import telegram as telegram
|
||||
try:
|
||||
from mytoken import token
|
||||
except:
|
||||
token = '' # use your own token here
|
||||
print ('token = ', token)
|
||||
Bot = telegram.Bot(token=token)
|
||||
test_command_handler = ExampleCommandHandler(Bot)
|
||||
test_command_handler.run()
|
2
setup.py
2
setup.py
|
@ -15,7 +15,7 @@ def read(*paths):
|
|||
|
||||
setup(
|
||||
name='python-telegram-bot',
|
||||
version='2.8.3',
|
||||
version='2.8.1',
|
||||
author='Leandro Toledo',
|
||||
author_email='leandrotoledodesouza@gmail.com',
|
||||
license='LGPLv3',
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
"""A library that provides a Python interface to the Telegram Bot API"""
|
||||
|
||||
__author__ = 'leandrotoledodesouza@gmail.com'
|
||||
__version__ = '2.8.3'
|
||||
__version__ = '2.8.1'
|
||||
|
||||
from .base import TelegramObject
|
||||
from .user import User
|
||||
|
@ -46,10 +46,12 @@ from .parsemode import ParseMode
|
|||
from .message import Message
|
||||
from .update import Update
|
||||
from .bot import Bot
|
||||
from .command_handler import *
|
||||
|
||||
__all__ = ['Bot', 'Emoji', 'TelegramError', 'InputFile', 'ReplyMarkup',
|
||||
'ForceReply', 'ReplyKeyboardHide', 'ReplyKeyboardMarkup',
|
||||
'UserProfilePhotos', 'ChatAction', 'Location', 'Contact',
|
||||
'Video', 'Sticker', 'Document', 'Audio', 'PhotoSize', 'GroupChat',
|
||||
'Update', 'ParseMode', 'Message', 'User', 'TelegramObject',
|
||||
'NullHandler', 'Voice']
|
||||
'Update', 'ParseMode', 'Message', 'User', 'TelegramObject', 'NullHandler',
|
||||
'Voice', 'CommandHandler', 'CommandHandlerWithHelp',
|
||||
'CommandHandlerWithFatherCommand', 'CommandHandlerWithHelpAndFather']
|
||||
|
|
|
@ -619,8 +619,7 @@ class Bot(TelegramObject):
|
|||
def getUpdates(self,
|
||||
offset=None,
|
||||
limit=100,
|
||||
timeout=0,
|
||||
requestTimeout=None):
|
||||
timeout=0):
|
||||
"""Use this method to receive incoming updates using long polling.
|
||||
|
||||
Args:
|
||||
|
@ -651,11 +650,7 @@ class Bot(TelegramObject):
|
|||
if timeout:
|
||||
data['timeout'] = timeout
|
||||
|
||||
kwargs = {}
|
||||
if not requestTimeout is None:
|
||||
kwargs['timeout'] = requestTimeout
|
||||
|
||||
result = request.post(url, data, **kwargs)
|
||||
result = request.post(url, data)
|
||||
|
||||
if result:
|
||||
self.logger.info(
|
||||
|
|
231
telegram/command_handler.py
Normal file
231
telegram/command_handler.py
Normal file
|
@ -0,0 +1,231 @@
|
|||
from inspect import getmembers, ismethod
|
||||
import threading
|
||||
import logging
|
||||
import telegram
|
||||
import time
|
||||
logger = logging.getLogger(__name__)
|
||||
__all__ = ['CommandHandler', 'CommandHandlerWithHelp', 'CommandHandlerWithFatherCommand',
|
||||
'CommandHandlerWithHelpAndFather']
|
||||
|
||||
|
||||
class CommandHandler(object):
|
||||
""" This handles incomming commands and gives an easy way to create commands.
|
||||
|
||||
How to use this:
|
||||
create a new class which inherits this class or CommandHandlerWithHelp.
|
||||
define new methods that start with 'command_' and then the command_name.
|
||||
run run()
|
||||
"""
|
||||
def __init__(self, bot):
|
||||
self.bot = bot # a telegram bot
|
||||
self.isValidCommand = None # a function that returns a boolean and takes one agrument an update.
|
||||
# If False is returned the the command is not executed.
|
||||
|
||||
def _get_command_func(self, command):
|
||||
if command[0] == '/':
|
||||
command = command[1:]
|
||||
if hasattr(self, 'command_' + command):
|
||||
return self.__getattribute__('command_' + command) # a function
|
||||
else:
|
||||
return None
|
||||
|
||||
def run(self, make_thread=True, last_update_id=None, thread_timeout=2, sleep=0.2):
|
||||
"""Continuously check for commands and run the according method
|
||||
|
||||
Args:
|
||||
make_thread:
|
||||
if True make a thread for each command it found.
|
||||
if False make run the code linearly
|
||||
last_update:
|
||||
the offset arg from getUpdates and is kept up to date within this function
|
||||
thread_timeout:
|
||||
The timeout on a thread. If a thread is alive after this period then try to join the thread in
|
||||
the next loop.
|
||||
"""
|
||||
|
||||
old_threads = []
|
||||
while True:
|
||||
time.sleep(sleep)
|
||||
threads, last_update_id = self.run_once(make_thread=make_thread, last_update_id=last_update_id)
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in old_threads:
|
||||
threads.append(t)
|
||||
old_threads = []
|
||||
for t in threads:
|
||||
t.join(timeout=thread_timeout)
|
||||
if t.isAlive():
|
||||
old_threads.append(t)
|
||||
|
||||
def run_once(self, make_thread=True, last_update_id=None):
|
||||
""" Check the the messages for commands and make a Thread with the command or run the command depending on make_thread.
|
||||
|
||||
Args:
|
||||
make_thread:
|
||||
True: the function returns a list with threads. Which didn't start yet.
|
||||
False: the function just runs the command it found and returns an empty list.
|
||||
last_update_id:
|
||||
the offset arg from getUpdates and is kept up to date within this function
|
||||
|
||||
Returns:
|
||||
A tuple of two elements. The first element is a list with threads which didn't start yet or an empty list if
|
||||
make_threads==False. The second element is the updated las_update_id
|
||||
"""
|
||||
bot_name = self.bot.getMe().username
|
||||
threads = []
|
||||
try:
|
||||
updates = self.bot.getUpdates(offset=last_update_id)
|
||||
except:
|
||||
updates = []
|
||||
for update in updates:
|
||||
last_update_id = update.update_id + 1
|
||||
message = update.message
|
||||
if message.text[0] == '/':
|
||||
command, username = message.text.split(' ')[0], bot_name
|
||||
if '@' in command:
|
||||
command, username = command.split('@')
|
||||
if username == bot_name:
|
||||
command_func = self._get_command_func(command)
|
||||
if command_func is not None:
|
||||
self.bot.sendChatAction(chat_id=update.message.chat.id, action=telegram.ChatAction.TYPING)
|
||||
if self.isValidCommand is None or self.isValidCommand(update):
|
||||
if make_thread:
|
||||
t = threading.Thread(target=command_func, args=(update,))
|
||||
threads.append(t)
|
||||
else:
|
||||
command_func(update)
|
||||
else:
|
||||
self._command_not_found(update) # TODO this must be another function.
|
||||
else:
|
||||
if make_thread:
|
||||
t = threading.Thread(target=self._command_not_found, args=(update,))
|
||||
threads.append(t)
|
||||
else:
|
||||
self._command_not_valid(update)
|
||||
return threads, last_update_id
|
||||
|
||||
def _command_not_valid(self, update):
|
||||
"""Inform the telegram user that the command was not found.
|
||||
|
||||
Override this method if you want to do it another way then by sending the the text:
|
||||
Sorry, I didn't understand the command: /command[@bot].
|
||||
"""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = "Sorry, the command was not authorised or valid: {command}.".format(
|
||||
command=update.message.text.split(' ')[0])
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
def _command_not_found(self, update):
|
||||
"""Inform the telegram user that the command was not found.
|
||||
|
||||
Override this method if you want to do it another way then by sending the the text:
|
||||
Sorry, I didn't understand the command: /command[@bot].
|
||||
"""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = "Sorry, I didn't understand the command: {command}.".format(command=update.message.text.split(' ')[0])
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
|
||||
class CommandHandlerWithHelp(CommandHandler):
|
||||
""" This CommandHandler has a builtin /help. It grabs the text from the docstrings of command_ functions."""
|
||||
def __init__(self, bot):
|
||||
super(CommandHandlerWithHelp, self).__init__(bot)
|
||||
self._help_title = 'Welcome to {name}.'.format(name=self.bot.getMe().username) # the title of help
|
||||
self._help_before_list = '' # text with information about the bot
|
||||
self._help_after_list = '' # a footer
|
||||
self._help_list_title = 'These are the commands:' # the title of the list
|
||||
self._help_extra_message = 'These commands are only usefull to the developer.'
|
||||
self.is_reply = True
|
||||
self.command_start = self.command_help
|
||||
self.skip_in_help = []
|
||||
|
||||
def command_helpextra(self,update):
|
||||
""" The commands in here are only usefull to the developer of the bot"""
|
||||
command_functions = [attr[1] for attr in getmembers(self, predicate=ismethod) if attr[0][:8] == 'command_' and
|
||||
attr[0] in self.skip_in_help]
|
||||
chat_id = update.message.chat.id
|
||||
help_message = self._help_extra_message + '\n'
|
||||
for command_function in command_functions:
|
||||
if command_function.__doc__ is not None:
|
||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + command_function.__doc__ + '\n'
|
||||
else:
|
||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + '\n'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=help_message)
|
||||
|
||||
def _generate_help(self):
|
||||
""" Generate a string which can be send as a help file.
|
||||
|
||||
This function generates a help file from all the docstrings from the commands.
|
||||
so docstrings of methods that start with command_ should explain what a command does and how a to use the
|
||||
command to the telegram user.
|
||||
"""
|
||||
|
||||
help_message = self._help_title + '\n\n'
|
||||
help_message += self._help_before_list + '\n\n'
|
||||
help_message += self._help_list_title + '\n'
|
||||
help_message += self._generate_help_list()
|
||||
help_message += '\n'
|
||||
help_message += self._help_after_list
|
||||
return help_message
|
||||
|
||||
def _generate_help_list(self):
|
||||
command_functions = [attr[1] for attr in getmembers(self, predicate=ismethod) if attr[0][:8] == 'command_' and
|
||||
attr[0] not in self.skip_in_help]
|
||||
help_message = ''
|
||||
for command_function in command_functions:
|
||||
if command_function.__doc__ is not None:
|
||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + command_function.__doc__ + '\n'
|
||||
else:
|
||||
help_message += ' /' + command_function.__name__[8:] + ' - ' + '\n'
|
||||
return help_message
|
||||
|
||||
def _command_not_found(self, update):
|
||||
"""Inform the telegram user that the command was not found."""
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = 'Sorry, I did not understand the command: {command}. Please see /help for all available commands'
|
||||
if self.is_reply:
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message.format(command=update.message.text.split(' ')[0]),
|
||||
reply_to_message_id=reply_to)
|
||||
else:
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message.format(command=update.message.text.split(' ')[0]))
|
||||
|
||||
def command_help(self, update):
|
||||
""" The help file. """
|
||||
chat_id = update.message.chat.id
|
||||
reply_to = update.message.message_id
|
||||
message = self._generate_help()
|
||||
self.bot.sendMessage(chat_id=chat_id, text=message, reply_to_message_id=reply_to)
|
||||
|
||||
|
||||
class CommandHandlerWithFatherCommand(CommandHandler):
|
||||
""" A class that creates some commands that are usefull when setting up the bot
|
||||
"""
|
||||
def __init__(self, bot):
|
||||
super(CommandHandlerWithFatherCommand, self).__init__(bot)
|
||||
self.skip_in_help = ['command_father']
|
||||
|
||||
def command_father(self, update):
|
||||
"""Gives you the commands you need to setup this bot. in telegram.me/BotFather"""
|
||||
chat_id = update.message.chat.id
|
||||
self.bot.sendMessage(chat_id=chat_id, text='Send the following messages to telegram.me/BotFather')
|
||||
self.bot.sendMessage(chat_id=chat_id, text='/setcommands')
|
||||
self.bot.sendMessage(chat_id=chat_id, text='@' + self.bot.getMe()['username'])
|
||||
commands = ''
|
||||
command_functions = [attr[1] for attr in getmembers(self, predicate=ismethod) if attr[0][:8] == 'command_' and
|
||||
attr[0] not in self.skip_in_help]
|
||||
|
||||
for command_function in command_functions:
|
||||
if command_function.__doc__ is not None:
|
||||
commands += command_function.__name__[8:] + ' - ' + command_function.__doc__ + '\n'
|
||||
else:
|
||||
commands += command_function.__name__[8:] + ' - ' + '\n'
|
||||
self.bot.sendMessage(chat_id=chat_id, text=commands)
|
||||
|
||||
|
||||
class CommandHandlerWithHelpAndFather(CommandHandlerWithFatherCommand, CommandHandlerWithHelp):
|
||||
"""A class that combines CommandHandlerWithHelp and CommandHandlerWithFatherCommand.
|
||||
"""
|
||||
pass
|
83
telegram/enchancedbot.py
Normal file
83
telegram/enchancedbot.py
Normal file
|
@ -0,0 +1,83 @@
|
|||
import telegram
|
||||
|
||||
|
||||
class NoSuchCommandException(BaseException):
|
||||
pass
|
||||
|
||||
class CommandDispatcher:
|
||||
def __init__(self,):
|
||||
self.commands = list()
|
||||
self.default = None
|
||||
|
||||
def addCommand(self, command, callback):
|
||||
self.commands.append((command, callback))
|
||||
|
||||
def setDefault(self, callback):
|
||||
self.default = callback
|
||||
|
||||
def dispatch(self, update):
|
||||
if hasattr(update.message, 'text'):
|
||||
text = update.message.text
|
||||
else:
|
||||
text = ''
|
||||
|
||||
user_id = update.message.from_user.id
|
||||
com = text.split('@')[0]
|
||||
for command, callback in self.commands:
|
||||
if com == command:
|
||||
return callback(command, user_id)
|
||||
if self.default is not None:
|
||||
return self.default(text, user_id)
|
||||
else:
|
||||
raise NoSuchCommandException()
|
||||
|
||||
|
||||
class EnhancedBot(telegram.Bot):
|
||||
"""The Bot class with command dispatcher added.
|
||||
|
||||
>>> bot = EnhancedBot(token=TOKEN)
|
||||
>>> @bot.command('/start')
|
||||
... def start(command, user_id):
|
||||
... # should return a tuple: (text, reply_id, custom_keyboard)
|
||||
... return ("Hello, there! Your id is {}".format(user_id), None, None)
|
||||
>>> while True:
|
||||
... bot.processUpdates()
|
||||
... time.sleep(3)
|
||||
"""
|
||||
def __init__(self, token):
|
||||
self.dispatcher = CommandDispatcher()
|
||||
telegram.Bot.__init__(self, token=token)
|
||||
self.offset = 0 #id of the last processed update
|
||||
|
||||
def command(self, *names, default=False):
|
||||
"""Decorator for adding callbacks for commands."""
|
||||
|
||||
def inner_command(callback):
|
||||
for name in names:
|
||||
self.dispatcher.addCommand(name, callback)
|
||||
if default:
|
||||
self.dispatcher.setDefault(callback)
|
||||
return callback # doesn't touch the callback, so we can use it
|
||||
return inner_command
|
||||
|
||||
def processUpdates(self):
|
||||
updates = self.getUpdates(offset=self.offset)
|
||||
|
||||
for update in updates:
|
||||
print('processing update: {}'.format(str(update.to_dict())))
|
||||
self.offset = update.update_id + 1
|
||||
if not hasattr(update, 'message'):
|
||||
continue
|
||||
|
||||
try:
|
||||
answer, reply_to, reply_markup = self.dispatcher.dispatch(update)
|
||||
except Exception as e:
|
||||
print('error occured') # TODO logging
|
||||
print(update.to_dict())
|
||||
raise e
|
||||
|
||||
if answer is not None:
|
||||
self.sendMessage(chat_id=update.message.chat_id,
|
||||
text=answer,
|
||||
reply_to_message_id=reply_to,
|
||||
reply_markup=reply_markup)
|
|
@ -17,10 +17,10 @@
|
|||
# You should have received a copy of the GNU Lesser Public License
|
||||
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
||||
|
||||
"""This module contains a object that represents a Telegram Parse Modes"""
|
||||
"""This module contains a object that represents a Telegram Message Parse Modes"""
|
||||
|
||||
|
||||
class ParseMode(object):
|
||||
"""This object represents a Telegram Parse Modes."""
|
||||
"""This object represents a Telegram Message Parse Modes."""
|
||||
|
||||
MARKDOWN = 'Markdown'
|
||||
|
|
74
telegram/utils/botan.py
Normal file
74
telegram/utils/botan.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# A library that provides a Python interface to the Telegram Bot API
|
||||
# Copyright (C) 2015 Leandro Toledo de Souza <leandrotoeldodesouza@gmail.com>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser Public License
|
||||
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
||||
|
||||
import json
|
||||
try:
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.error import HTTPError, URLError
|
||||
except ImportError:
|
||||
from urllib import urlencode
|
||||
from urllib2 import urlopen, Request
|
||||
from urllib2 import HTTPError, URLError
|
||||
|
||||
DEFAULT_BASE_URL = \
|
||||
'https://api.botan.io/track?token=%(token)&uid=%(uid)&name=%(name)'
|
||||
USER_AGENT = 'Python Telegram Bot' \
|
||||
' (https://github.com/leandrotoledo/python-telegram-bot)'
|
||||
CONTENT_TYPE = 'application/json'
|
||||
|
||||
class Botan(Object):
|
||||
def __init__(self,
|
||||
token,
|
||||
base_url=None):
|
||||
self.token = token
|
||||
|
||||
if base_url is None:
|
||||
self.base_url = DEFAULT_BASE_URL % {'token': self.token}
|
||||
else:
|
||||
self.base_url = base_url % {'token': self.token}
|
||||
|
||||
def track(self,
|
||||
uid,
|
||||
text,
|
||||
name = 'Message'):
|
||||
|
||||
url = self.base_url % {'uid': uid,
|
||||
'message': text,
|
||||
'name': name}
|
||||
|
||||
self._post(url, message)
|
||||
|
||||
def _post(self,
|
||||
url,
|
||||
data):
|
||||
headers = {'User-agent': USER_AGENT,
|
||||
'Content-type': CONTENT_TYPE}
|
||||
|
||||
try:
|
||||
request = Request(
|
||||
url,
|
||||
data=urlencode(data).encode(),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
return urlopen(request).read()
|
||||
except IOError as e:
|
||||
raise TelegramError(str(e))
|
||||
except HTTPError as e:
|
||||
raise TelegramError(str(e))
|
|
@ -21,67 +21,18 @@
|
|||
|
||||
import json
|
||||
|
||||
REQUESTS = False
|
||||
try:
|
||||
import requests
|
||||
REQUESTS = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.error import HTTPError
|
||||
from urllib.error import HTTPError, URLError
|
||||
except ImportError:
|
||||
from urllib import urlencode
|
||||
from urllib2 import urlopen, Request
|
||||
from urllib2 import HTTPError
|
||||
|
||||
if REQUESTS:
|
||||
_universal_post = requests.post
|
||||
else:
|
||||
_universal_post = Request
|
||||
from urllib2 import HTTPError, URLError
|
||||
|
||||
from telegram import (InputFile, TelegramError)
|
||||
|
||||
|
||||
def retry(maxretries):
|
||||
"""@retry(x) decorates a function to run x times
|
||||
if it doesn't return correctly.
|
||||
|
||||
Args:
|
||||
maxretries:
|
||||
Max number of retries until exception is raised further.
|
||||
|
||||
Returns:
|
||||
The decorated function.
|
||||
"""
|
||||
def retry_decorator(func):
|
||||
def retry_wrapper(*args, **kwargs):
|
||||
# logger.debug('Trying {} with arguments: {}, {}.'
|
||||
# .format(func.__name__, args, kwargs))
|
||||
usedtries = 0
|
||||
while usedtries <= (maxretries - 1):
|
||||
try:
|
||||
reply = func(*args, **kwargs)
|
||||
if reply:
|
||||
# logger.debug(
|
||||
# 'Reply: {}'.format(reply.replace('\n','\\n')))
|
||||
pass
|
||||
else:
|
||||
# logger.debug(
|
||||
# 'Reply: none')
|
||||
pass
|
||||
return reply
|
||||
except:
|
||||
if usedtries < (maxretries - 1):
|
||||
# logger.debug('Exception in {}: {}'
|
||||
# .format(func.__name__, args, kwargs))
|
||||
usedtries += 1
|
||||
else:
|
||||
raise
|
||||
retry_wrapper.fc = func.func_code
|
||||
return retry_wrapper
|
||||
return retry_decorator
|
||||
|
||||
def _parse(json_data):
|
||||
"""Try and parse the JSON returned from Telegram and return an empty
|
||||
dictionary if there is any error.
|
||||
|
@ -100,45 +51,29 @@ def _parse(json_data):
|
|||
|
||||
return data['result']
|
||||
|
||||
@retry(3)
|
||||
def get(url, timeout=None):
|
||||
|
||||
def get(url):
|
||||
"""Request an URL.
|
||||
Args:
|
||||
url:
|
||||
The web location we want to retrieve.
|
||||
timeout:
|
||||
(optional) timeout in seconds. Raises exception
|
||||
if request exceeds it.
|
||||
|
||||
Returns:
|
||||
A JSON object.
|
||||
"""
|
||||
|
||||
kwargs = {}
|
||||
if not timeout is None:
|
||||
kwargs['timeout'] = timeout
|
||||
|
||||
if REQUESTS:
|
||||
result = requests.get(url, **kwargs).content
|
||||
else:
|
||||
result = urlopen(url, **kwargs).read()
|
||||
result = urlopen(url).read()
|
||||
|
||||
return _parse(result)
|
||||
|
||||
|
||||
@retry(3)
|
||||
def post(url,
|
||||
data,
|
||||
timeout=None):
|
||||
data):
|
||||
"""Request an URL.
|
||||
Args:
|
||||
url:
|
||||
The web location we want to retrieve.
|
||||
data:
|
||||
A dict of (str, unicode) key/value pairs.
|
||||
timeout:
|
||||
(optional) timeout in seconds. Raises exception
|
||||
if request exceeds it.
|
||||
|
||||
Returns:
|
||||
A JSON object.
|
||||
|
@ -146,24 +81,16 @@ def post(url,
|
|||
try:
|
||||
if InputFile.is_inputfile(data):
|
||||
data = InputFile(data)
|
||||
kwargs = {
|
||||
'data': data.to_form(),
|
||||
'headers': data.headers
|
||||
}
|
||||
request = Request(url,
|
||||
data=data.to_form(),
|
||||
headers=data.headers)
|
||||
else:
|
||||
data = json.dumps(data)
|
||||
kwargs = {
|
||||
'data': data.encode(),
|
||||
'headers': {'Content-Type': 'application/json'}
|
||||
}
|
||||
if not timeout is None:
|
||||
kwargs['timeout'] = timeout
|
||||
request = _universal_post(url, **kwargs)
|
||||
if REQUESTS:
|
||||
request.raise_for_status()
|
||||
result = request.content
|
||||
else:
|
||||
result = urlopen(request).read()
|
||||
request = Request(url,
|
||||
data=data.encode(),
|
||||
headers={'Content-Type': 'application/json'})
|
||||
|
||||
result = urlopen(request).read()
|
||||
except HTTPError as error:
|
||||
if error.getcode() == 403:
|
||||
raise TelegramError('Unauthorized')
|
||||
|
|
75
tests/legacy/test_command_handler.py
Normal file
75
tests/legacy/test_command_handler.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
import sys
|
||||
sys.path.append('.')
|
||||
import telegram
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from telegram.command_handler import CommandHandler, CommandHandlerWithHelp
|
||||
|
||||
|
||||
class CommandHandlerTmp(CommandHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CommandHandlerTmp, self).__init__(*args, **kwargs)
|
||||
self.output = None
|
||||
|
||||
def command_test(self, update):
|
||||
self.output = 1
|
||||
|
||||
|
||||
class CommandHandlerTmp2(CommandHandlerWithHelp):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CommandHandlerTmp2, self).__init__(*args, **kwargs)
|
||||
self.output_test = None
|
||||
|
||||
def command_test(self, update):
|
||||
self.output_test = 1
|
||||
|
||||
|
||||
def fake_getUpdates(*args, **kwargs):
|
||||
from_user = telegram.User(id=42, first_name='hello')
|
||||
message = telegram.Message(message_id=42, from_user=from_user, date=None, chat=from_user, text='/test')
|
||||
update = telegram.Update(update_id=42, message=message)
|
||||
return [update]
|
||||
|
||||
output_fsm = None
|
||||
|
||||
|
||||
def fake_sendMessage(chat_id, message, *args, **kwargs):
|
||||
global output_fsm
|
||||
output_fsm = (chat_id, message)
|
||||
return telegram.Message(43, 123, 000000, telegram.User(chat_id, 'test'), text=message)
|
||||
|
||||
|
||||
class CommandHandlerTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.bot = unittest.mock.MagicMock()
|
||||
self.bot.getUpdates = fake_getUpdates
|
||||
self.bot.sendMessage = fake_sendMessage
|
||||
|
||||
def test_get_command_func(self):
|
||||
CH = CommandHandlerTmp(self.bot)
|
||||
self.assertEqual(CH.command_test, CH._get_command_func('test'))
|
||||
self.assertEqual(CH.command_test, CH._get_command_func('/test'))
|
||||
self.assertEqual(None, CH._get_command_func('this function does not exsist'))
|
||||
|
||||
def test_run_once(self):
|
||||
CH = CommandHandlerTmp(self.bot)
|
||||
self.assertEqual(CH.output, None)
|
||||
threads, last_update = CH.run_once(make_thread=True)
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
self.assertEqual(CH.output, 1)
|
||||
|
||||
def test_run(self):
|
||||
pass # TODO implement test
|
||||
|
||||
def test__command_not_found(self):
|
||||
CH = CommandHandlerTmp(self.bot)
|
||||
CH._command_not_found(self.bot.getUpdates()[0])
|
||||
self.assertEqual(output_fsm, (42, "Sorry, I didn't understand the command: /test."))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
unittest.main(sys.argv)
|
Loading…
Reference in a new issue