pyrofork/pyrogram/methods/messages/send_media_group.py
wulan17 e3e84d91e8
Pyrofork: Add support to reply message in another chat
Signed-off-by: wulan17 <wulan17@nusantararom.org>
2023-11-25 21:51:41 +07:00

464 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Pyrofork - Telegram MTProto API Client Library for Python
# Copyright (C) 2017-present Dan <https://github.com/delivrance>
# Copyright (C) 2022-present Mayuri-Chan <https://github.com/Mayuri-Chan>
#
# This file is part of Pyrofork.
#
# Pyrofork is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrofork is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import re
from datetime import datetime
from pymediainfo import MediaInfo
from typing import Union, List
import pyrogram
from pyrogram import raw
from pyrogram import types
from pyrogram import utils
from pyrogram.file_id import FileType
log = logging.getLogger(__name__)
class SendMediaGroup:
# TODO: Add progress parameter
async def send_media_group(
self: "pyrogram.Client",
chat_id: Union[int, str],
media: List[Union[
"types.InputMediaPhoto",
"types.InputMediaVideo",
"types.InputMediaAudio",
"types.InputMediaDocument",
"types.InputMediaAnimation"
]],
disable_notification: bool = None,
message_thread_id: int = None,
reply_to_message_id: int = None,
reply_to_story_id: int = None,
reply_to_chat_id: int = None,
quote_text: str = None,
schedule_date: datetime = None,
protect_content: bool = None,
) -> List["types.Message"]:
"""Send a group of photos or videos as an album.
.. include:: /_includes/usable-by/users-bots.rst
Parameters:
chat_id (``int`` | ``str``):
Unique identifier (int) or username (str) of the target chat.
For your personal cloud (Saved Messages) you can simply use "me" or "self".
For a contact that exists in your Telegram address book you can use his phone number (str).
media (List of :obj:`~pyrogram.types.InputMediaPhoto`, :obj:`~pyrogram.types.InputMediaVideo`, :obj:`~pyrogram.types.InputMediaAudio`, :obj:`~pyrogram.types.InputMediaDocument` and :obj:`~pyrogram.types.InputMediaAnimation`):
A list describing photos and videos to be sent, must include 210 items.
disable_notification (``bool``, *optional*):
Sends the message silently.
Users will receive a notification with no sound.
message_thread_id (``int``, *optional*):
Unique identifier for the target message thread (topic) of the forum.
for forum supergroups only.
reply_to_message_id (``int``, *optional*):
If the message is a reply, ID of the original message.
reply_to_story_id (``int``, *optional*):
Unique identifier for the target story.
reply_to_chat_id (``int``, *optional*):
Unique identifier for the origin chat.
for reply to message from another chat.
quote_text (``str``, *optional*):
Text to quote.
for reply_to_message only.
schedule_date (:py:obj:`~datetime.datetime`, *optional*):
Date when the message will be automatically sent.
protect_content (``bool``, *optional*):
Protects the contents of the sent message from forwarding and saving.
Returns:
List of :obj:`~pyrogram.types.Message`: On success, a list of the sent messages is returned.
Example:
.. code-block:: python
from pyrogram.types import InputMediaPhoto, InputMediaVideo
await app.send_media_group(
"me",
[
InputMediaPhoto("photo1.jpg"),
InputMediaPhoto("photo2.jpg", caption="photo caption"),
InputMediaVideo("video.mp4", caption="video caption")
]
)
"""
multi_media = []
reply_to = None
reply_to_chat = None
if reply_to_message_id or message_thread_id:
if reply_to_chat_id is not None:
reply_to_chat = await self.resolve_peer(reply_to_chat_id)
reply_to = types.InputReplyToMessage(
reply_to_message_id=reply_to_message_id,
message_thread_id=message_thread_id,
reply_to_chat=reply_to_chat,
quote_text=quote_text
)
if reply_to_story_id:
user_id = await self.resolve_peer(chat_id)
reply_to = types.InputReplyToStory(user_id=user_id, story_id=reply_to_story_id)
for i in media:
if isinstance(i, types.InputMediaPhoto):
if isinstance(i.media, str):
if os.path.isfile(i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedPhoto(
file=await self.save_file(i.media),
spoiler=i.has_spoiler
)
)
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaPhotoExternal(
url=i.media,
spoiler=i.has_spoiler
)
)
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.PHOTO)
else:
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedPhoto(
file=await self.save_file(i.media),
spoiler=i.has_spoiler
)
)
)
media = raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
),
spoiler=i.has_spoiler
)
elif (
isinstance(i, types.InputMediaVideo)
or
isinstance(i, types.InputMediaAnimation)
):
if isinstance(i.media, str):
is_animation = False
if os.path.isfile(i.media):
videoInfo = MediaInfo.parse(i.media)
if not any([track.track_type == 'Audio' for track in videoInfo.tracks]):
is_animation = True
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
spoiler=i.has_spoiler,
mime_type=self.guess_mime_type(i.media) or "video/mp4",
nosound_video=is_animation,
attributes=[
raw.types.DocumentAttributeVideo(
supports_streaming=True if is_animation else (i.supports_streaming or None),
duration=i.duration,
w=i.width,
h=i.height
),
raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media)),
raw.types.DocumentAttributeAnimated() if is_animation else None
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media,
spoiler=i.has_spoiler
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.VIDEO)
else:
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
spoiler=i.has_spoiler,
mime_type=self.guess_mime_type(getattr(i.media, "name", "video.mp4")) or "video/mp4",
attributes=[
raw.types.DocumentAttributeVideo(
supports_streaming=i.supports_streaming or None,
duration=i.duration,
w=i.width,
h=i.height
),
raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "video.mp4"))
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
),
spoiler=i.has_spoiler
)
elif isinstance(i, types.InputMediaAudio):
if isinstance(i.media, str):
if os.path.isfile(i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(i.media) or "audio/mpeg",
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
attributes=[
raw.types.DocumentAttributeAudio(
duration=i.duration,
performer=i.performer,
title=i.title
),
raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media))
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.AUDIO)
else:
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(getattr(i.media, "name", "audio.mp3")) or "audio/mpeg",
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
attributes=[
raw.types.DocumentAttributeAudio(
duration=i.duration,
performer=i.performer,
title=i.title
),
raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "audio.mp3"))
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
elif isinstance(i, types.InputMediaDocument):
if isinstance(i.media, str):
if os.path.isfile(i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(i.media) or "application/zip",
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
attributes=[
raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media))
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
elif re.match("^https?://", i.media):
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaDocumentExternal(
url=i.media
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
else:
media = utils.get_input_media_from_file_id(i.media, FileType.DOCUMENT)
else:
media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=raw.types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(
getattr(i.media, "name", "file.zip")
) or "application/zip",
file=await self.save_file(i.media),
thumb=await self.save_file(i.thumb),
attributes=[
raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "file.zip"))
]
)
)
)
media = raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference
)
)
else:
raise ValueError(f"{i.__class__.__name__} is not a supported type for send_media_group")
multi_media.append(
raw.types.InputSingleMedia(
media=media,
random_id=self.rnd_id(),
**await self.parser.parse(i.caption, i.parse_mode)
)
)
r = await self.invoke(
raw.functions.messages.SendMultiMedia(
peer=await self.resolve_peer(chat_id),
multi_media=multi_media,
silent=disable_notification or None,
reply_to=reply_to,
schedule_date=utils.datetime_to_timestamp(schedule_date),
noforwards=protect_content
),
sleep_threshold=60
)
return await utils.parse_messages(
self,
raw.types.messages.Messages(
messages=[m.message for m in filter(
lambda u: isinstance(u, (raw.types.UpdateNewMessage,
raw.types.UpdateNewChannelMessage,
raw.types.UpdateNewScheduledMessage)),
r.updates
)],
users=r.users,
chats=r.chats
)
)