mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2025-01-22 16:32:08 +01:00
480 lines
17 KiB
Python
480 lines
17 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# A library that provides a Python interface to the Telegram Bot API
|
|
# Copyright (C) 2015-2018
|
|
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser Public License
|
|
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
|
"""This module contains helper functions."""
|
|
|
|
import datetime as dtm # dtm = "DateTime Module"
|
|
import time
|
|
|
|
from collections import defaultdict
|
|
from numbers import Number
|
|
|
|
try:
|
|
import ujson as json
|
|
except ImportError:
|
|
import json
|
|
from html import escape
|
|
|
|
import re
|
|
import signal
|
|
|
|
# From https://stackoverflow.com/questions/2549939/get-signal-names-from-numbers-in-python
|
|
_signames = {v: k
|
|
for k, v in reversed(sorted(vars(signal).items()))
|
|
if k.startswith('SIG') and not k.startswith('SIG_')}
|
|
|
|
|
|
def get_signal_name(signum):
|
|
"""Returns the signal name of the given signal number."""
|
|
return _signames[signum]
|
|
|
|
|
|
def escape_markdown(text):
|
|
"""Helper function to escape telegram markup symbols."""
|
|
escape_chars = '\*_`\['
|
|
return re.sub(r'([%s])' % escape_chars, r'\\\1', text)
|
|
|
|
|
|
# -------- date/time related helpers --------
|
|
# TODO: add generic specification of UTC for naive datetimes to docs
|
|
|
|
if hasattr(dtm, 'timezone'):
|
|
# Python 3.3+
|
|
def _datetime_to_float_timestamp(dt_obj):
|
|
if dt_obj.tzinfo is None:
|
|
dt_obj = dt_obj.replace(tzinfo=_UTC)
|
|
return dt_obj.timestamp()
|
|
|
|
_UtcOffsetTimezone = dtm.timezone
|
|
_UTC = dtm.timezone.utc
|
|
else:
|
|
# Python < 3.3 (incl 2.7)
|
|
|
|
# hardcoded timezone class (`datetime.timezone` isn't available in py2)
|
|
class _UtcOffsetTimezone(dtm.tzinfo):
|
|
def __init__(self, offset):
|
|
self.offset = offset
|
|
|
|
def tzname(self, dt):
|
|
return 'UTC +{}'.format(self.offset)
|
|
|
|
def utcoffset(self, dt):
|
|
return self.offset
|
|
|
|
def dst(self, dt):
|
|
return dtm.timedelta(0)
|
|
|
|
_UTC = _UtcOffsetTimezone(dtm.timedelta(0))
|
|
__EPOCH_DT = dtm.datetime.fromtimestamp(0, tz=_UTC)
|
|
__NAIVE_EPOCH_DT = __EPOCH_DT.replace(tzinfo=None)
|
|
|
|
# _datetime_to_float_timestamp
|
|
# Not using future.backports.datetime here as datetime value might be an input from the user,
|
|
# making every isinstace() call more delicate. So we just use our own compat layer.
|
|
def _datetime_to_float_timestamp(dt_obj):
|
|
epoch_dt = __EPOCH_DT if dt_obj.tzinfo is not None else __NAIVE_EPOCH_DT
|
|
return (dt_obj - epoch_dt).total_seconds()
|
|
|
|
_datetime_to_float_timestamp.__doc__ = \
|
|
"""Converts a datetime object to a float timestamp (with sub-second precision).
|
|
If the datetime object is timezone-naive, it is assumed to be in UTC."""
|
|
|
|
|
|
def to_float_timestamp(t, reference_timestamp=None):
|
|
"""
|
|
Converts a given time object to a float POSIX timestamp.
|
|
Used to convert different time specifications to a common format. The time object
|
|
can be relative (i.e. indicate a time increment, or a time of day) or absolute.
|
|
Any objects from the :module:`datetime` module that are timezone-naive will be assumed
|
|
to be in UTC.
|
|
|
|
``None`` s are left alone (i.e. ``to_float_timestamp(None)`` is ``None``).
|
|
|
|
Args:
|
|
t (int | float | datetime.timedelta | datetime.datetime | datetime.time):
|
|
Time value to convert. The semantics of this parameter will depend on its type:
|
|
|
|
* :obj:`int` or :obj:`float` will be interpreted as "seconds from ``reference_t``"
|
|
* :obj:`datetime.timedelta` will be interpreted as
|
|
"time increment from ``reference_t``"
|
|
* :obj:`datetime.datetime` will be interpreted as an absolute date/time value
|
|
* :obj:`datetime.time` will be interpreted as a specific time of day
|
|
|
|
reference_timestamp (float, optional): POSIX timestamp that indicates the absolute time
|
|
from which relative calculations are to be performed (e.g. when ``t`` is given as an
|
|
:obj:`int`, indicating "seconds from ``reference_t``"). Defaults to now (the time at
|
|
which this function is called).
|
|
|
|
If ``t`` is given as an absolute representation of date & time (i.e. a
|
|
``datetime.datetime`` object), ``reference_timestamp`` is not relevant and so its
|
|
value should be ``None``. If this is not the case, a ``ValueError`` will be raised.
|
|
|
|
Returns:
|
|
(float | None) The return value depends on the type of argument ``t``. If ``t`` is
|
|
given as a time increment (i.e. as a obj:`int`, :obj:`float` or
|
|
:obj:`datetime.timedelta`), then the return value will be ``reference_t`` + ``t``.
|
|
|
|
Else if it is given as an absolute date/time value (i.e. a :obj:`datetime.datetime`
|
|
object), the equivalent value as a POSIX timestamp will be returned.
|
|
|
|
Finally, if it is a time of the day without date (i.e. a :obj:`datetime.time`
|
|
object), the return value is the nearest future occurrence of that time of day.
|
|
|
|
Raises:
|
|
TypeError: if `t`'s type is not one of those described above
|
|
"""
|
|
|
|
if reference_timestamp is None:
|
|
reference_timestamp = time.time()
|
|
elif isinstance(t, dtm.datetime):
|
|
raise ValueError('t is an (absolute) datetime while reference_timestamp is not None')
|
|
|
|
if isinstance(t, dtm.timedelta):
|
|
return reference_timestamp + t.total_seconds()
|
|
elif isinstance(t, Number):
|
|
return reference_timestamp + t
|
|
elif isinstance(t, dtm.time):
|
|
if t.tzinfo is not None:
|
|
reference_dt = dtm.datetime.fromtimestamp(reference_timestamp, tz=t.tzinfo)
|
|
else:
|
|
reference_dt = dtm.datetime.utcfromtimestamp(reference_timestamp) # assume UTC
|
|
reference_date = reference_dt.date()
|
|
reference_time = reference_dt.timetz()
|
|
if reference_time > t: # if the time of day has passed today, use tomorrow
|
|
reference_date += dtm.timedelta(days=1)
|
|
return _datetime_to_float_timestamp(dtm.datetime.combine(reference_date, t))
|
|
elif isinstance(t, dtm.datetime):
|
|
return _datetime_to_float_timestamp(t)
|
|
|
|
raise TypeError('Unable to convert {} object to timestamp'.format(type(t).__name__))
|
|
|
|
|
|
def to_timestamp(dt_obj, reference_timestamp=None):
|
|
"""
|
|
Wrapper over :func:`to_float_timestamp` which returns an integer (the float value truncated
|
|
down to the nearest integer).
|
|
|
|
See the documentation for :func:`to_float_timestamp` for more details.
|
|
"""
|
|
return int(to_float_timestamp(dt_obj, reference_timestamp)) if dt_obj is not None else None
|
|
|
|
|
|
def from_timestamp(unixtime):
|
|
"""
|
|
Converts an (integer) unix timestamp to a naive datetime object in UTC.
|
|
``None`` s are left alone (i.e. ``from_timestamp(None)`` is ``None``).
|
|
|
|
Args:
|
|
unixtime (int): integer POSIX timestamp
|
|
|
|
Returns:
|
|
equivalent :obj:`datetime.datetime` value in naive UTC if ``timestamp`` is not
|
|
``None``; else ``None``
|
|
"""
|
|
if unixtime is None:
|
|
return None
|
|
|
|
return dtm.datetime.utcfromtimestamp(unixtime)
|
|
|
|
# -------- end --------
|
|
|
|
|
|
def mention_html(user_id, name):
|
|
"""
|
|
Args:
|
|
user_id (:obj:`int`) The user's id which you want to mention.
|
|
name (:obj:`str`) The name the mention is showing.
|
|
|
|
Returns:
|
|
:obj:`str`: The inline mention for the user as html.
|
|
"""
|
|
if isinstance(user_id, int):
|
|
return u'<a href="tg://user?id={}">{}</a>'.format(user_id, escape(name))
|
|
|
|
|
|
def mention_markdown(user_id, name):
|
|
"""
|
|
Args:
|
|
user_id (:obj:`int`) The user's id which you want to mention.
|
|
name (:obj:`str`) The name the mention is showing.
|
|
|
|
Returns:
|
|
:obj:`str`: The inline mention for the user as markdown.
|
|
"""
|
|
if isinstance(user_id, int):
|
|
return u'[{}](tg://user?id={})'.format(escape_markdown(name), user_id)
|
|
|
|
|
|
def effective_message_type(entity):
|
|
"""
|
|
Extracts the type of message as a string identifier from a :class:`telegram.Message` or a
|
|
:class:`telegram.Update`.
|
|
|
|
Args:
|
|
entity (:obj:`Update` | :obj:`Message`) The ``update`` or ``message`` to extract from
|
|
|
|
Returns:
|
|
str: One of ``Message.MESSAGE_TYPES``
|
|
|
|
"""
|
|
|
|
# Importing on file-level yields cyclic Import Errors
|
|
from telegram import Message
|
|
from telegram import Update
|
|
|
|
if isinstance(entity, Message):
|
|
message = entity
|
|
elif isinstance(entity, Update):
|
|
message = entity.effective_message
|
|
else:
|
|
raise TypeError("entity is not Message or Update (got: {})".format(type(entity)))
|
|
|
|
for i in Message.MESSAGE_TYPES:
|
|
if getattr(message, i, None):
|
|
return i
|
|
|
|
return None
|
|
|
|
|
|
def create_deep_linked_url(bot_username, payload=None, group=False):
|
|
"""
|
|
Creates a deep-linked URL for this ``bot_username`` with the specified ``payload``.
|
|
See https://core.telegram.org/bots#deep-linking to learn more.
|
|
|
|
The ``payload`` may consist of the following characters: ``A-Z, a-z, 0-9, _, -``
|
|
|
|
Note:
|
|
Works well in conjunction with
|
|
``CommandHandler("start", callback, filters = Filters.regex('payload'))``
|
|
|
|
Examples:
|
|
``create_deep_linked_url(bot.get_me().username, "some-params")``
|
|
|
|
Args:
|
|
bot_username (:obj:`str`): The username to link to
|
|
payload (:obj:`str`, optional): Parameters to encode in the created URL
|
|
group (:obj:`bool`, optional): If `True` the user is prompted to select a group to add the
|
|
bot to. If `False`, opens a one-on-one conversation with the bot. Defaults to `False`.
|
|
|
|
Returns:
|
|
:obj:`str`: An URL to start the bot with specific parameters
|
|
"""
|
|
if bot_username is None or len(bot_username) <= 3:
|
|
raise ValueError("You must provide a valid bot_username.")
|
|
|
|
base_url = 'https://t.me/{}'.format(bot_username)
|
|
if not payload:
|
|
return base_url
|
|
|
|
if len(payload) > 64:
|
|
raise ValueError("The deep-linking payload must not exceed 64 characters.")
|
|
|
|
if not re.match(r'^[A-Za-z0-9_-]+$', payload):
|
|
raise ValueError("Only the following characters are allowed for deep-linked "
|
|
"URLs: A-Z, a-z, 0-9, _ and -")
|
|
|
|
if group:
|
|
key = 'startgroup'
|
|
else:
|
|
key = 'start'
|
|
|
|
return '{0}?{1}={2}'.format(
|
|
base_url,
|
|
key,
|
|
payload
|
|
)
|
|
|
|
|
|
def encode_conversations_to_json(conversations):
|
|
"""Helper method to encode a conversations dict (that uses tuples as keys) to a
|
|
JSON-serializable way. Use :attr:`_decode_conversations_from_json` to decode.
|
|
|
|
Args:
|
|
conversations (:obj:`dict`): The conversations dict to transofrm to JSON.
|
|
|
|
Returns:
|
|
:obj:`str`: The JSON-serialized conversations dict
|
|
"""
|
|
tmp = {}
|
|
for handler, states in conversations.items():
|
|
tmp[handler] = {}
|
|
for key, state in states.items():
|
|
tmp[handler][json.dumps(key)] = state
|
|
return json.dumps(tmp)
|
|
|
|
|
|
def decode_conversations_from_json(json_string):
|
|
"""Helper method to decode a conversations dict (that uses tuples as keys) from a
|
|
JSON-string created with :attr:`_encode_conversations_to_json`.
|
|
|
|
Args:
|
|
json_string (:obj:`str`): The conversations dict as JSON string.
|
|
|
|
Returns:
|
|
:obj:`dict`: The conversations dict after decoding
|
|
"""
|
|
tmp = json.loads(json_string)
|
|
conversations = {}
|
|
for handler, states in tmp.items():
|
|
conversations[handler] = {}
|
|
for key, state in states.items():
|
|
conversations[handler][tuple(json.loads(key))] = state
|
|
return conversations
|
|
|
|
|
|
def decode_user_chat_data_from_json(data):
|
|
"""Helper method to decode chat or user data (that uses ints as keys) from a
|
|
JSON-string.
|
|
|
|
Args:
|
|
data (:obj:`str`): The user/chat_data dict as JSON string.
|
|
|
|
Returns:
|
|
:obj:`dict`: The user/chat_data defaultdict after decoding
|
|
"""
|
|
|
|
tmp = defaultdict(dict)
|
|
decoded_data = json.loads(data)
|
|
for user, data in decoded_data.items():
|
|
user = int(user)
|
|
tmp[user] = {}
|
|
for key, value in data.items():
|
|
try:
|
|
key = int(key)
|
|
except ValueError:
|
|
pass
|
|
tmp[user][key] = value
|
|
return tmp
|
|
|
|
|
|
class DefaultValue:
|
|
"""Wrapper for immutable default arguments that allows to check, if the default value was set
|
|
explicitly. Usage::
|
|
|
|
DefaultOne = DefaultValue(1)
|
|
def f(arg=DefaultOne):
|
|
if arg is DefaultOne:
|
|
print('`arg` is the default')
|
|
arg = arg.value
|
|
else:
|
|
print('`arg` was set explicitly')
|
|
print('`arg` = ' + str(arg))
|
|
|
|
This yields::
|
|
|
|
>>> f()
|
|
`arg` is the default
|
|
`arg` = 1
|
|
>>> f(1)
|
|
`arg` was set explicitly
|
|
`arg` = 1
|
|
>>> f(2)
|
|
`arg` was set explicitly
|
|
`arg` = 2
|
|
|
|
Also allows to evaluate truthiness::
|
|
|
|
default = DefaultValue(value)
|
|
if default:
|
|
...
|
|
|
|
is equivalent to::
|
|
|
|
default = DefaultValue(value)
|
|
if value:
|
|
...
|
|
|
|
Attributes:
|
|
value (:obj:`obj`): The value of the default argument
|
|
|
|
Args:
|
|
value (:obj:`obj`): The value of the default argument
|
|
"""
|
|
def __init__(self, value=None):
|
|
self.value = value
|
|
|
|
def __bool__(self):
|
|
return bool(self.value)
|
|
|
|
# For Python 2.x
|
|
__nonzero__ = __bool__
|
|
|
|
|
|
DEFAULT_NONE = DefaultValue(None)
|
|
""":class:`DefaultValue`: Default `None`"""
|
|
|
|
|
|
class Defaults:
|
|
"""Convenience Class to gather all parameters with a (user defined) default value
|
|
|
|
Attributes:
|
|
parse_mode (:obj:`str`): Optional. Send Markdown or HTML, if you want Telegram apps to show
|
|
bold, italic, fixed-width toxt or URLs in your bot's message.
|
|
disable_notification (:obj:`bool`): Optional. Sends the message silently. Users will
|
|
receive a notification with no sound.
|
|
disable_web_page_preview (:obj:`bool`): Optional. Disables link previews for links in this
|
|
message.
|
|
timeout (:obj:`int` | :obj:`float`): Optional. If this value is specified, use it as the
|
|
read timeout from the server (instead of the one specified during creation of the
|
|
connection pool).
|
|
quote (:obj:`bool`): Optional. If set to ``True``, the reply is sent as an actual reply to
|
|
the message. If ``reply_to_message_id`` is passed in ``kwargs``, this parameter will
|
|
be ignored. Default: ``True`` in group chats and ``False`` in private chats.
|
|
|
|
Parameters:
|
|
parse_mode (:obj:`str`, optional): Send Markdown or HTML, if you want Telegram apps to show
|
|
bold, italic, fixed-width toxt or URLs in your bot's message.
|
|
disable_notification (:obj:`bool`, optional): Sends the message silently. Users will
|
|
receive a notification with no sound.
|
|
disable_web_page_preview (:obj:`bool`, optional): Disables link previews for links in this
|
|
message.
|
|
timeout (:obj:`int` | :obj:`float`, optional): If this value is specified, use it as the
|
|
read timeout from the server (instead of the one specified during creation of the
|
|
connection pool).
|
|
quote (:obj:`bool`, opitonal): If set to ``True``, the reply is sent as an actual reply to
|
|
the message. If ``reply_to_message_id`` is passed in ``kwargs``, this parameter will
|
|
be ignored. Default: ``True`` in group chats and ``False`` in private chats.
|
|
"""
|
|
def __init__(self,
|
|
parse_mode=None,
|
|
disable_notification=None,
|
|
disable_web_page_preview=None,
|
|
# Timeout needs special treatment, since the bot methods have two different
|
|
# default values for timeout (None and 20s)
|
|
timeout=DEFAULT_NONE,
|
|
quote=None):
|
|
self.parse_mode = parse_mode
|
|
self.disable_notification = disable_notification
|
|
self.disable_web_page_preview = disable_web_page_preview
|
|
self.timeout = timeout
|
|
self.quote = quote
|
|
|
|
def __hash__(self):
|
|
return hash((self.parse_mode,
|
|
self.disable_notification,
|
|
self.disable_web_page_preview,
|
|
self.timeout,
|
|
self.quote))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, Defaults):
|
|
return self.__dict__ == other.__dict__
|
|
return False
|
|
|
|
def __ne__(self, other):
|
|
return not self == other
|