pyrofork/pyrogram/methods/messages/send_media_group.py
wulan17 aee9eefbbf
pyrofork: Add reply_to_monoforum_id parameter to supported send_*
Signed-off-by: wulan17 <wulan17@komodos.id>
2025-06-10 20:42:01 +07:00

561 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Pyrofork - Telegram MTProto API Client Library for Python
# Copyright (C) 2017-present Dan <https://github.com/delivrance>
# Copyright (C) 2022-present Mayuri-Chan <https://github.com/Mayuri-Chan>
#
# This file is part of Pyrofork.
#
# Pyrofork is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrofork is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import re
from datetime import datetime
from pymediainfo import MediaInfo
from typing import (
Union,
List,
Optional,
Callable,
)
import pyrogram
from pyrogram import enums
from pyrogram import raw
from pyrogram import types
from pyrogram import utils
from pyrogram.file_id import FileType
log = logging.getLogger(__name__)
class SendMediaGroup:
async def send_media_group(
self: "pyrogram.Client",
chat_id: Union[int, str],
media: List[Union[
"types.InputMediaPhoto",
"types.InputMediaVideo",
"types.InputMediaAudio",
"types.InputMediaDocument",
"types.InputMediaAnimation"
]],
disable_notification: bool = None,
message_thread_id: int = None,
business_connection_id: str = None,
reply_to_message_id: int = None,
reply_to_story_id: int = None,
reply_to_chat_id: Union[int, str] = None,
reply_to_monoforum_id: Union[int, str] = None,
quote_text: str = None,
quote_entities: List["types.MessageEntity"] = None,
parse_mode: Optional["enums.ParseMode"] = None,
schedule_date: datetime = None,
protect_content: bool = None,
allow_paid_broadcast: bool = None,
message_effect_id: int = None,
invert_media: bool = None,
progress: Callable = None,
progress_args: tuple = (),
) -> List["types.Message"]:
"""Send a group of photos or videos as an album.
.. include:: /_includes/usable-by/users-bots.rst
Parameters:
chat_id (``int`` | ``str``):
Unique identifier (int) or username (str) of the target chat.
For your personal cloud (Saved Messages) you can simply use "me" or "self".
For a contact that exists in your Telegram address book you can use his phone number (str).
You can also use chat public link in form of *t.me/<username>* (str).
media (List of :obj:`~pyrogram.types.InputMediaPhoto`, :obj:`~pyrogram.types.InputMediaVideo`, :obj:`~pyrogram.types.InputMediaAudio`, :obj:`~pyrogram.types.InputMediaDocument` and :obj:`~pyrogram.types.InputMediaAnimation`):
A list describing photos and videos to be sent, must include 210 items.
disable_notification (``bool``, *optional*):
Sends the message silently.
Users will receive a notification with no sound.
message_thread_id (``int``, *optional*):
Unique identifier for the target message thread (topic) of the forum.
for forum supergroups only.
business_connection_id (``str``, *optional*):
Business connection identifier.
for business bots only.
reply_to_message_id (``int``, *optional*):
If the message is a reply, ID of the original message.
reply_to_story_id (``int``, *optional*):
Unique identifier for the target story.
reply_to_chat_id (``int`` | ``str``, *optional*):
Unique identifier for the origin chat.
for reply to message from another chat.
You can also use chat public link in form of *t.me/<username>* (str).
reply_to_monoforum_id (``int`` | ``str``, *optional*):
Unique identifier for the target user of monoforum.
for reply to message from monoforum.
for channel administrators only.
quote_text (``str``, *optional*):
Text to quote.
for reply_to_message only.
quote_entities (List of :obj:`~pyrogram.types.MessageEntity`, *optional*):
List of special entities that appear in quote_text, which can be specified instead of *parse_mode*.
for reply_to_message only.
parse_mode (:obj:`~pyrogram.enums.ParseMode`, *optional*):
By default, quote_text are parsed using both Markdown and HTML styles.
You can combine both syntaxes together.
For quote_text.
schedule_date (:py:obj:`~datetime.datetime`, *optional*):
Date when the message will be automatically sent.
protect_content (``bool``, *optional*):
Protects the contents of the sent message from forwarding and saving.
allow_paid_broadcast (``bool``, *optional*):
Pass True to allow the message to ignore regular broadcast limits for a small fee; for bots only.
message_effect_id (``int`` ``64-bit``, *optional*):
Unique identifier of the message effect to be added to the message; for private chats only.
invert_media (``bool``, *optional*):
Inverts the position of the media and caption.
progress (``Callable``, *optional*):
Pass a callback function to view the file transmission progress.
The function must take *(current, total)* as positional arguments (look at Other Parameters below for a
detailed description) and will be called back each time a new file chunk has been successfully
transmitted.
progress_args (``tuple``, *optional*):
Extra custom arguments for the progress callback function.
You can pass anything you need to be available in the progress callback scope; for example, a Message
object or a Client instance in order to edit the message with the updated progress status.
Other Parameters:
current (``int``):
The amount of bytes transmitted so far.
total (``int``):
The total size of the file.
*args (``tuple``, *optional*):
Extra custom arguments as defined in the ``progress_args`` parameter.
You can either keep ``*args`` or add every single extra argument in your function signature.
Returns:
List of :obj:`~pyrogram.types.Message`: On success, a list of the sent messages is returned.
Example:
.. code-block:: python
from pyrogram.types import InputMediaPhoto, InputMediaVideo
await app.send_media_group(
"me",
[
InputMediaPhoto("photo1.jpg"),
InputMediaPhoto("photo2.jpg", caption="photo caption"),
InputMediaVideo("video.mp4", caption="video caption")
]
)
"""
multi_media = []
reply_to = await utils.get_reply_to(
client=self,
chat_id=chat_id,
reply_to_message_id=reply_to_message_id,
reply_to_story_id=reply_to_story_id,
message_thread_id=message_thread_id,
reply_to_chat_id=reply_to_chat_id,
reply_to_monoforum_id=reply_to_monoforum_id,
quote_text=quote_text,
quote_entities=quote_entities,
parse_mode=parse_mode,
)
for i in media:
if isinstance(i, types.InputMediaPhoto):
if isinstance(i.media, str):
if os.path.isfile(i.media):
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedPhoto(
file=file,
spoiler=i.has_spoiler,
),
),
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler,
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaPhotoExternal(
url=i.media,
spoiler=i.has_spoiler,
),
),
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler,
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.PHOTO)
else:
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedPhoto(
file=file,
spoiler=i.has_spoiler,
),
),
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler,
)
elif (
isinstance(i, types.InputMediaVideo)
or
isinstance(i, types.InputMediaAnimation)
):
if isinstance(i.media, str):
is_animation = False
if os.path.isfile(i.media):
try:
videoInfo = MediaInfo.parse(i.media)
except OSError:
is_animation = True if isinstance(i, types.InputMediaAnimation) else False
else:
if not any([track.track_type == 'Audio' for track in videoInfo.tracks]):
is_animation = True
attributes = [
raw.types.DocumentAttributeVideo(
supports_streaming=True if is_animation else (i.supports_streaming or None),
duration=i.duration,
w=i.width,
h=i.height
),
raw.types.DocumentAttributeFilename(file_name=i.file_name or os.path.basename(i.media)),
]
if is_animation:
attributes.append(raw.types.DocumentAttributeAnimated())
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
file=file,
thumb=thumb,
spoiler=i.has_spoiler,
mime_type=self.guess_mime_type(i.media) or "video/mp4",
nosound_video=is_animation,
attributes=attributes,
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler,
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media,
spoiler=i.has_spoiler,
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler,
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.VIDEO)
else:
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
file=file,
thumb=thumb,
spoiler=i.has_spoiler,
mime_type=self.guess_mime_type(getattr(i.media, "name", "video.mp4")) or "video/mp4",
attributes=[
raw.types.DocumentAttributeVideo(
supports_streaming=i.supports_streaming or None,
duration=i.duration,
w=i.width,
h=i.height
),
raw.types.DocumentAttributeFilename(file_name=i.file_name or getattr(i.media, "name", "video.mp4")),
],
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler,
)
elif isinstance(i, types.InputMediaAudio):
if isinstance(i.media, str):
if os.path.isfile(i.media):
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(i.media) or "audio/mpeg",
file=file,
thumb=thumb,
attributes=[
raw.types.DocumentAttributeAudio(
duration=i.duration,
performer=i.performer,
title=i.title
),
raw.types.DocumentAttributeFilename(file_name=i.file_name or os.path.basename(i.media)),
],
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media,
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.AUDIO)
else:
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(getattr(i.media, "name", "audio.mp3")) or "audio/mpeg",
file=file,
thumb=thumb,
attributes=[
raw.types.DocumentAttributeAudio(
duration=i.duration,
performer=i.performer,
title=i.title
),
raw.types.DocumentAttributeFilename(file_name=i.file_name or getattr(i.media, "name", "audio.mp3")),
],
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
elif isinstance(i, types.InputMediaDocument):
if isinstance(i.media, str):
if os.path.isfile(i.media):
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(i.media) or "application/zip",
file=file,
thumb=thumb,
attributes=[
raw.types.DocumentAttributeFilename(file_name=i.file_name or os.path.basename(i.media)),
],
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media,
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.DOCUMENT)
else:
thumb = await self.save_file(i.thumb)
file = await self.save_file(i.media, progress=progress, progress_args=progress_args)
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(
getattr(i.media, "name", "file.zip")
) or "application/zip",
file=file,
thumb=thumb,
attributes=[
raw.types.DocumentAttributeFilename(file_name=i.file_name or getattr(i.media, "name", "file.zip")),
],
),
),
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
)
else:
raise ValueError(f"{i.__class__.__name__} is not a supported type for send_media_group")
multi_media.append(
raw.types.InputSingleMedia(
media=media,
random_id=self.rnd_id(),
**(await utils.parse_text_entities(self, i.caption, i.parse_mode, i.caption_entities)),
),
)
rpc = raw.functions.messages.SendMultiMedia(
peer=await self.resolve_peer(chat_id),
multi_media=multi_media,
silent=disable_notification or None,
reply_to=reply_to,
schedule_date=utils.datetime_to_timestamp(schedule_date),
noforwards=protect_content,
allow_paid_floodskip=allow_paid_broadcast,
effect=message_effect_id,
invert_media=invert_media,
)
if business_connection_id is not None:
r = await self.invoke(
raw.functions.InvokeWithBusinessConnection(
connection_id=business_connection_id,
query=rpc
),
sleep_threshold=60,
)
else:
r = await self.invoke(rpc, sleep_threshold=60)
return await utils.parse_messages(
self,
raw.types.messages.Messages(
messages=[m.message for m in filter(
lambda u: isinstance(u, (raw.types.UpdateNewMessage,
raw.types.UpdateNewChannelMessage,
raw.types.UpdateNewScheduledMessage,
raw.types.UpdateBotNewBusinessMessage)),
r.updates
)],
users=r.users,
chats=r.chats,
),
business_connection_id=business_connection_id,
)