# Pyrofork - Telegram MTProto API Client Library for Python # Copyright (C) 2017-present Dan # Copyright (C) 2022-present Mayuri-Chan # # This file is part of Pyrofork. # # Pyrofork is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrofork is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrofork. If not, see . import logging import os import re from datetime import datetime from pymediainfo import MediaInfo from typing import Union, List, Optional import pyrogram from pyrogram import enums from pyrogram import raw from pyrogram import types from pyrogram import utils from pyrogram.file_id import FileType log = logging.getLogger(__name__) class SendMediaGroup: # TODO: Add progress parameter async def send_media_group( self: "pyrogram.Client", chat_id: Union[int, str], media: List[Union[ "types.InputMediaPhoto", "types.InputMediaVideo", "types.InputMediaAudio", "types.InputMediaDocument", "types.InputMediaAnimation" ]], disable_notification: bool = None, message_thread_id: int = None, reply_to_message_id: int = None, reply_to_story_id: int = None, reply_to_chat_id: Union[int, str] = None, quote_text: str = None, quote_entities: List["types.MessageEntity"] = None, parse_mode: Optional["enums.ParseMode"] = None, schedule_date: datetime = None, protect_content: bool = None, ) -> List["types.Message"]: """Send a group of photos or videos as an album. .. include:: /_includes/usable-by/users-bots.rst Parameters: chat_id (``int`` | ``str``): Unique identifier (int) or username (str) of the target chat. For your personal cloud (Saved Messages) you can simply use "me" or "self". For a contact that exists in your Telegram address book you can use his phone number (str). You can also use chat public link in form of *t.me/* (str). media (List of :obj:`~pyrogram.types.InputMediaPhoto`, :obj:`~pyrogram.types.InputMediaVideo`, :obj:`~pyrogram.types.InputMediaAudio`, :obj:`~pyrogram.types.InputMediaDocument` and :obj:`~pyrogram.types.InputMediaAnimation`): A list describing photos and videos to be sent, must include 2–10 items. disable_notification (``bool``, *optional*): Sends the message silently. Users will receive a notification with no sound. message_thread_id (``int``, *optional*): Unique identifier for the target message thread (topic) of the forum. for forum supergroups only. reply_to_message_id (``int``, *optional*): If the message is a reply, ID of the original message. reply_to_story_id (``int``, *optional*): Unique identifier for the target story. reply_to_chat_id (``int`` | ``str``, *optional*): Unique identifier for the origin chat. for reply to message from another chat. You can also use chat public link in form of *t.me/* (str). quote_text (``str``, *optional*): Text to quote. for reply_to_message only. quote_entities (List of :obj:`~pyrogram.types.MessageEntity`, *optional*): List of special entities that appear in quote_text, which can be specified instead of *parse_mode*. for reply_to_message only. parse_mode (:obj:`~pyrogram.enums.ParseMode`, *optional*): By default, quote_text are parsed using both Markdown and HTML styles. You can combine both syntaxes together. For quote_text. schedule_date (:py:obj:`~datetime.datetime`, *optional*): Date when the message will be automatically sent. protect_content (``bool``, *optional*): Protects the contents of the sent message from forwarding and saving. Returns: List of :obj:`~pyrogram.types.Message`: On success, a list of the sent messages is returned. Example: .. code-block:: python from pyrogram.types import InputMediaPhoto, InputMediaVideo await app.send_media_group( "me", [ InputMediaPhoto("photo1.jpg"), InputMediaPhoto("photo2.jpg", caption="photo caption"), InputMediaVideo("video.mp4", caption="video caption") ] ) """ multi_media = [] reply_to = await utils.get_reply_to( client=self, chat_id=chat_id, reply_to_message_id=reply_to_message_id, reply_to_story_id=reply_to_story_id, message_thread_id=message_thread_id, reply_to_chat_id=reply_to_chat_id, quote_text=quote_text, quote_entities=quote_entities, parse_mode=parse_mode ) for i in media: if isinstance(i, types.InputMediaPhoto): if isinstance(i.media, str): if os.path.isfile(i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedPhoto( file=await self.save_file(i.media), spoiler=i.has_spoiler ) ) ) media = raw.types.InputMediaPhoto( id=raw.types.InputPhoto( id=media.photo.id, access_hash=media.photo.access_hash, file_reference=media.photo.file_reference ), spoiler=i.has_spoiler ) elif re.match("^https?://", i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaPhotoExternal( url=i.media, spoiler=i.has_spoiler ) ) ) media = raw.types.InputMediaPhoto( id=raw.types.InputPhoto( id=media.photo.id, access_hash=media.photo.access_hash, file_reference=media.photo.file_reference ), spoiler=i.has_spoiler ) else: media = utils.get_input_media_from_file_id(i.media, FileType.PHOTO) else: media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedPhoto( file=await self.save_file(i.media), spoiler=i.has_spoiler ) ) ) media = raw.types.InputMediaPhoto( id=raw.types.InputPhoto( id=media.photo.id, access_hash=media.photo.access_hash, file_reference=media.photo.file_reference ), spoiler=i.has_spoiler ) elif ( isinstance(i, types.InputMediaVideo) or isinstance(i, types.InputMediaAnimation) ): if isinstance(i.media, str): is_animation = False if os.path.isfile(i.media): try: videoInfo = MediaInfo.parse(i.media) except OSError: is_animation = True if isinstance(i, types.InputMediaAnimation) else False else: if not any([track.track_type == 'Audio' for track in videoInfo.tracks]): is_animation = True attributes = [ raw.types.DocumentAttributeVideo( supports_streaming=True if is_animation else (i.supports_streaming or None), duration=i.duration, w=i.width, h=i.height ), raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media)) ] if is_animation: attributes.append(raw.types.DocumentAttributeAnimated()) media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), spoiler=i.has_spoiler, mime_type=self.guess_mime_type(i.media) or "video/mp4", nosound_video=is_animation, attributes=attributes ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ), spoiler=i.has_spoiler ) elif re.match("^https?://", i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaDocumentExternal( url=i.media, spoiler=i.has_spoiler ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ), spoiler=i.has_spoiler ) else: media = utils.get_input_media_from_file_id(i.media, FileType.VIDEO) else: media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), spoiler=i.has_spoiler, mime_type=self.guess_mime_type(getattr(i.media, "name", "video.mp4")) or "video/mp4", attributes=[ raw.types.DocumentAttributeVideo( supports_streaming=i.supports_streaming or None, duration=i.duration, w=i.width, h=i.height ), raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "video.mp4")) ] ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ), spoiler=i.has_spoiler ) elif isinstance(i, types.InputMediaAudio): if isinstance(i.media, str): if os.path.isfile(i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( mime_type=self.guess_mime_type(i.media) or "audio/mpeg", file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), attributes=[ raw.types.DocumentAttributeAudio( duration=i.duration, performer=i.performer, title=i.title ), raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media)) ] ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) elif re.match("^https?://", i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaDocumentExternal( url=i.media ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) else: media = utils.get_input_media_from_file_id(i.media, FileType.AUDIO) else: media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( mime_type=self.guess_mime_type(getattr(i.media, "name", "audio.mp3")) or "audio/mpeg", file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), attributes=[ raw.types.DocumentAttributeAudio( duration=i.duration, performer=i.performer, title=i.title ), raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "audio.mp3")) ] ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) elif isinstance(i, types.InputMediaDocument): if isinstance(i.media, str): if os.path.isfile(i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( mime_type=self.guess_mime_type(i.media) or "application/zip", file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), attributes=[ raw.types.DocumentAttributeFilename(file_name=os.path.basename(i.media)) ] ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) elif re.match("^https?://", i.media): media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaDocumentExternal( url=i.media ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) else: media = utils.get_input_media_from_file_id(i.media, FileType.DOCUMENT) else: media = await self.invoke( raw.functions.messages.UploadMedia( peer=await self.resolve_peer(chat_id), media=raw.types.InputMediaUploadedDocument( mime_type=self.guess_mime_type( getattr(i.media, "name", "file.zip") ) or "application/zip", file=await self.save_file(i.media), thumb=await self.save_file(i.thumb), attributes=[ raw.types.DocumentAttributeFilename(file_name=getattr(i.media, "name", "file.zip")) ] ) ) ) media = raw.types.InputMediaDocument( id=raw.types.InputDocument( id=media.document.id, access_hash=media.document.access_hash, file_reference=media.document.file_reference ) ) else: raise ValueError(f"{i.__class__.__name__} is not a supported type for send_media_group") multi_media.append( raw.types.InputSingleMedia( media=media, random_id=self.rnd_id(), **await self.parser.parse(i.caption, i.parse_mode) ) ) r = await self.invoke( raw.functions.messages.SendMultiMedia( peer=await self.resolve_peer(chat_id), multi_media=multi_media, silent=disable_notification or None, reply_to=reply_to, schedule_date=utils.datetime_to_timestamp(schedule_date), noforwards=protect_content ), sleep_threshold=60 ) return await utils.parse_messages( self, raw.types.messages.Messages( messages=[m.message for m in filter( lambda u: isinstance(u, (raw.types.UpdateNewMessage, raw.types.UpdateNewChannelMessage, raw.types.UpdateNewScheduledMessage)), r.updates )], users=r.users, chats=r.chats ) )