# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-present Dan # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . import os import re import io import asyncio import io import pyrogram from pyrogram import raw from pyrogram import types from pyrogram import utils from pyrogram.errors import RPCError, MediaEmpty from pyrogram.file_id import FileType from .inline_session import get_session class EditInlineMedia: MAX_RETRIES = 3 async def edit_inline_media( self: "pyrogram.Client", inline_message_id: str, media: "types.InputMedia", reply_markup: "types.InlineKeyboardMarkup" = None ) -> bool: """Edit inline animation, audio, document, photo or video messages. When the inline message is edited, a new file can't be uploaded. Use a previously uploaded file via its file_id or specify a URL. Parameters: inline_message_id (``str``): Required if *chat_id* and *message_id* are not specified. Identifier of the inline message. media (:obj:`~pyrogram.types.InputMedia`): One of the InputMedia objects describing an animation, audio, document, photo or video. reply_markup (:obj:`~pyrogram.types.InlineKeyboardMarkup`, *optional*): An InlineKeyboardMarkup object. Returns: ``bool``: On success, True is returned. Example: .. code-block:: python from pyrogram.types import InputMediaPhoto, InputMediaVideo, InputMediaAudio # Bots only # Replace the current media with a local photo await app.edit_inline_media(inline_message_id, InputMediaPhoto("new_photo.jpg")) # Replace the current media with a local video await app.edit_inline_media(inline_message_id, InputMediaVideo("new_video.mp4")) # Replace the current media with a local audio await app.edit_inline_media(inline_message_id, InputMediaAudio("new_audio.mp3")) """ caption = media.caption parse_mode = media.parse_mode is_photo = isinstance(media, types.InputMediaPhoto) is_bytes_io = isinstance(media.media, io.BytesIO) is_uploaded_file = is_bytes_io or os.path.isfile(media.media) is_external_url = not is_uploaded_file and re.match("^https?://", media.media) if is_bytes_io and not hasattr(media.media, "name"): media.media.name = "media" if is_uploaded_file: filename_attribute = [ raw.types.DocumentAttributeFilename( file_name=media.media.name if is_bytes_io else os.path.basename(media.media) ) ] else: filename_attribute = [] if is_photo: if is_uploaded_file: media = raw.types.InputMediaUploadedPhoto( file=await self.save_file(media.media) ) elif is_external_url: media = raw.types.InputMediaPhotoExternal( url=media.media ) else: media = utils.get_input_media_from_file_id(media.media, FileType.PHOTO) elif isinstance(media, types.InputMediaVideo): if is_uploaded_file: media = raw.types.InputMediaUploadedDocument( mime_type=(None if is_bytes_io else self.guess_mime_type(media.media)) or "video/mp4", thumb=await self.save_file(media.thumb), file=await self.save_file(media.media), attributes=[ raw.types.DocumentAttributeVideo( supports_streaming=media.supports_streaming or None, duration=media.duration, w=media.width, h=media.height ) ] + filename_attribute ) elif is_external_url: media = raw.types.InputMediaDocumentExternal( url=media.media ) else: media = utils.get_input_media_from_file_id(media.media, FileType.VIDEO) elif isinstance(media, types.InputMediaAudio): if is_uploaded_file: media = raw.types.InputMediaUploadedDocument( mime_type=(None if is_bytes_io else self.guess_mime_type(media.media)) or "audio/mpeg", thumb=await self.save_file(media.thumb), file=await self.save_file(media.media), attributes=[ raw.types.DocumentAttributeAudio( duration=media.duration, performer=media.performer, title=media.title ) ] + filename_attribute ) elif is_external_url: media = raw.types.InputMediaDocumentExternal( url=media.media ) else: media = utils.get_input_media_from_file_id(media.media, FileType.AUDIO) elif isinstance(media, types.InputMediaAnimation): if is_uploaded_file: media = raw.types.InputMediaUploadedDocument( mime_type=(None if is_bytes_io else self.guess_mime_type(media.media)) or "video/mp4", thumb=await self.save_file(media.thumb), file=await self.save_file(media.media), attributes=[ raw.types.DocumentAttributeVideo( supports_streaming=True, duration=media.duration, w=media.width, h=media.height ), raw.types.DocumentAttributeAnimated() ] + filename_attribute, nosound_video=True ) elif is_external_url: media = raw.types.InputMediaDocumentExternal( url=media.media ) else: media = utils.get_input_media_from_file_id(media.media, FileType.ANIMATION) elif isinstance(media, types.InputMediaDocument): if is_uploaded_file: media = raw.types.InputMediaUploadedDocument( mime_type=(None if is_bytes_io else self.guess_mime_type(media.media)) or "application/zip", thumb=await self.save_file(media.thumb), file=await self.save_file(media.media), attributes=filename_attribute, force_file=True ) elif is_external_url: media = raw.types.InputMediaDocumentExternal( url=media.media ) else: media = utils.get_input_media_from_file_id(media.media, FileType.DOCUMENT) unpacked = utils.unpack_inline_message_id(inline_message_id) dc_id = unpacked.dc_id session = await get_session(self, dc_id) if is_uploaded_file: uploaded_media = await self.invoke( raw.functions.messages.UploadMedia( peer=raw.types.InputPeerSelf(), media=media ) ) actual_media = raw.types.InputMediaPhoto( id=raw.types.InputPhoto( id=uploaded_media.photo.id, access_hash=uploaded_media.photo.access_hash, file_reference=uploaded_media.photo.file_reference ) ) if is_photo else raw.types.InputMediaDocument( id=raw.types.InputDocument( id=uploaded_media.document.id, access_hash=uploaded_media.document.access_hash, file_reference=uploaded_media.document.file_reference ) ) else: actual_media = media for i in range(self.MAX_RETRIES): try: return await session.invoke( raw.functions.messages.EditInlineBotMessage( id=unpacked, media=actual_media, reply_markup=await reply_markup.write(self) if reply_markup else None, **await self.parser.parse(caption, parse_mode) ), sleep_threshold=self.sleep_threshold ) except RPCError as e: if i == self.MAX_RETRIES - 1: raise if isinstance(e, MediaEmpty): # Must wait due to a server race condition await asyncio.sleep(1)