Add Update.effective_sender (#4168)

This commit is contained in:
Abdelrahman Elkheir 2024-03-30 19:21:13 +02:00 committed by GitHub
parent 2d8d43f2a5
commit 23536ee759
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 144 additions and 4 deletions

View file

@ -18,7 +18,7 @@
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains an object that represents a Telegram Update."""
from typing import TYPE_CHECKING, Final, List, Optional
from typing import TYPE_CHECKING, Final, List, Optional, Union
from telegram import constants
from telegram._callbackquery import CallbackQuery
@ -224,6 +224,7 @@ class Update(TelegramObject):
__slots__ = (
"_effective_chat",
"_effective_message",
"_effective_sender",
"_effective_user",
"callback_query",
"channel_post",
@ -371,6 +372,7 @@ class Update(TelegramObject):
self.message_reaction_count: Optional[MessageReactionCountUpdated] = message_reaction_count
self._effective_user: Optional[User] = None
self._effective_sender: Optional[Union["User", "Chat"]] = None
self._effective_chat: Optional[Chat] = None
self._effective_message: Optional[Message] = None
@ -444,6 +446,58 @@ class Update(TelegramObject):
self._effective_user = user
return user
@property
def effective_sender(self) -> Optional[Union["User", "Chat"]]:
"""
:class:`telegram.User` or :class:`telegram.Chat`: The user or chat that sent this update,
no matter what kind of update this is.
Note:
* Depending on the type of update and the user's 'Remain anonymous' setting, this
could either be :class:`telegram.User`, :class:`telegram.Chat` or :obj:`None`.
If no user whatsoever is associated with this update, this gives :obj:`None`. This
is the case if any of
* :attr:`poll`
* :attr:`chat_boost`
* :attr:`removed_chat_boost`
* :attr:`message_reaction_count`
is present.
Example:
* If :attr:`message` is present, this will give either
:attr:`telegram.Message.from_user` or :attr:`telegram.Message.sender_chat`.
* If :attr:`poll_answer` is present, this will give either
:attr:`telegram.PollAnswer.user` or :attr:`telegram.PollAnswer.voter_chat`.
* If :attr:`channel_post` is present, this will give
:attr:`telegram.Message.sender_chat`.
.. versionadded:: NEXT.VERSION
"""
if self._effective_sender:
return self._effective_sender
sender: Optional[Union["User", "Chat"]] = None
if message := (
self.message or self.edited_message or self.channel_post or self.edited_channel_post
):
sender = message.sender_chat
elif self.poll_answer:
sender = self.poll_answer.voter_chat
elif self.message_reaction:
sender = self.message_reaction.actor_chat
if sender is None:
sender = self.effective_user
self._effective_sender = sender
return sender
@property
def effective_chat(self) -> Optional["Chat"]:
"""

View file

@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import time
from copy import deepcopy
from datetime import datetime
import pytest
@ -51,7 +52,21 @@ from telegram._utils.datetime import from_timestamp
from telegram.warnings import PTBUserWarning
from tests.auxil.slots import mro_slots
message = Message(1, datetime.utcnow(), Chat(1, ""), from_user=User(1, "", False), text="Text")
message = Message(
1,
datetime.utcnow(),
Chat(1, ""),
from_user=User(1, "", False),
text="Text",
sender_chat=Chat(1, ""),
)
channel_post = Message(
1,
datetime.utcnow(),
Chat(1, ""),
text="Text",
sender_chat=Chat(1, ""),
)
chat_member_updated = ChatMemberUpdated(
Chat(1, "chat"),
User(1, "", False),
@ -93,6 +108,7 @@ message_reaction = MessageReactionUpdated(
old_reaction=(ReactionTypeEmoji("👍"),),
new_reaction=(ReactionTypeEmoji("👍"),),
user=User(1, "name", False),
actor_chat=Chat(1, ""),
)
@ -108,8 +124,8 @@ params = [
{"message": message},
{"edited_message": message},
{"callback_query": CallbackQuery(1, User(1, "", False), "chat", message=message)},
{"channel_post": message},
{"edited_channel_post": message},
{"channel_post": channel_post},
{"edited_channel_post": channel_post},
{"inline_query": InlineQuery(1, User(1, "", False), "", "")},
{"chosen_inline_result": ChosenInlineResult("id", User(1, "", False), "")},
{"shipping_query": ShippingQuery("id", User(1, "", False), "", None)},
@ -261,6 +277,76 @@ class TestUpdateWithoutRequest(TestUpdateBase):
else:
assert user is None
def test_effective_sender_non_anonymous(self, update):
update = deepcopy(update)
# Simulate 'Remain anonymous' being turned off
if message := (update.message or update.edited_message):
message._unfreeze()
message.sender_chat = None
elif reaction := (update.message_reaction):
reaction._unfreeze()
reaction.actor_chat = None
elif answer := (update.poll_answer):
answer._unfreeze()
answer.voter_chat = None
# Test that it's sometimes None per docstring
sender = update.effective_sender
if not (
update.poll is not None
or update.chat_boost is not None
or update.removed_chat_boost is not None
or update.message_reaction_count is not None
):
if update.channel_post or update.edited_channel_post:
assert isinstance(sender, Chat)
else:
assert isinstance(sender, User)
else:
assert sender is None
cached = update.effective_sender
assert cached is sender
def test_effective_sender_anonymous(self, update):
update = deepcopy(update)
# Simulate 'Remain anonymous' being turned on
if message := (update.message or update.edited_message):
message._unfreeze()
message.from_user = None
elif reaction := (update.message_reaction):
reaction._unfreeze()
reaction.user = None
elif answer := (update.poll_answer):
answer._unfreeze()
answer.user = None
# Test that it's sometimes None per docstring
sender = update.effective_sender
if not (
update.poll is not None
or update.chat_boost is not None
or update.removed_chat_boost is not None
or update.message_reaction_count is not None
):
if (
update.message
or update.edited_message
or update.channel_post
or update.edited_channel_post
or update.message_reaction
or update.poll_answer
):
assert isinstance(sender, Chat)
else:
assert isinstance(sender, User)
else:
assert sender is None
cached = update.effective_sender
assert cached is sender
def test_effective_message(self, update):
# Test that it's sometimes None per docstring
eff_message = update.effective_message