""" * @author yasir * @date 2022-12-01 09:12:27 * @lastModified 2022-12-01 09:32:31 * @projectName MissKatyPyro * Copyright @YasirPedia All rights reserved """ import os, logging, json, shutil, asyncio, time from misskaty import app from PIL import Image from pyrogram import filters from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from misskaty.vars import COMMAND_HANDLER from datetime import datetime from hachoir.metadata import extractMetadata from hachoir.parser import createParser from misskaty.helper.ytdl_helper import random_char, DownLoadFile from misskaty.helper.human_read import get_readable_file_size from misskaty.plugins.dev import shell_exec from misskaty.core.decorator.errors import capture_err from misskaty.helper.pyro_progress import progress_for_pyrogram logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) LOGGER = logging.getLogger(__name__) user_time = {} @app.on_message(filters.command(["ytdown"], COMMAND_HANDLER) & ~filters.channel) @capture_err async def ytdown(_, message): if len(message.command) == 1: return await message.reply( f"Gunakan command /{message.command[0]} YT_LINK untuk download video dengan YT-DLP." ) userLastDownloadTime = user_time.get(message.chat.id) try: if userLastDownloadTime > datetime.now(): wait_time = round( (userLastDownloadTime - datetime.now()).total_seconds() / 60, 2 ) await message.reply_text(f"Wait {wait_time} Minutes before next request..") return except: pass url = message.command[1] command_to_exec = f"yt-dlp --no-warnings --youtube-skip-dash-manifest -j {url}" t_response = (await shell_exec(command_to_exec))[0] if "ERROR" in t_response: await message.reply_text(t_response, quote=True, disable_web_page_preview=True) return False inline_keyboard = [] if t_response: x_reponse = t_response if "\n" in x_reponse: x_reponse, _ = x_reponse.split("\n") response_json = json.loads(x_reponse) randem = random_char(5) if not os.path.exists("./YT_Down"): os.makedirs("./YT_Down") save_ytdl_json_path = f"YT_Down/{str(message.from_user.id)}{randem}.json" with open(save_ytdl_json_path, "w", encoding="utf8") as outfile: json.dump(response_json, outfile, ensure_ascii=False) duration = None if "duration" in response_json: duration = response_json["duration"] if "formats" in response_json: for formats in response_json["formats"]: format_id = formats.get("format_id") format_string = formats.get("format_note") if format_string in ["ultralow", "low", "medium"]: continue if format_string is None: format_string = formats.get("format") format_ext = formats.get("ext") if format_ext == "mhtml": continue if formats.get("filesize"): size = formats["filesize"] elif formats.get("filesize_approx"): size = formats["filesize_approx"] else: size = 0 cb_string_video = f"ytdl|video|{format_id}|{format_ext}|{randem}" cb_string_file = f"ytdl|file|{format_id}|{format_ext}|{randem}" if format_string and "audio only" not in format_string: ikeyboard = [ InlineKeyboardButton( f"🎬 {format_string} {format_ext} {get_readable_file_size(size)} ", callback_data=(cb_string_video).encode("UTF-8"), ), InlineKeyboardButton( f"📄 {format_string} {format_ext} {get_readable_file_size(size)} ", callback_data=(cb_string_file).encode("UTF-8"), ), ] else: # special weird case :\ ikeyboard = [ InlineKeyboardButton( "SVideo [" + "] ( " + get_readable_file_size(size) + " )", callback_data=(cb_string_video).encode("UTF-8"), ), InlineKeyboardButton( "DFile [" + "] ( " + get_readable_file_size(size) + " )", callback_data=(cb_string_file).encode("UTF-8"), ), ] inline_keyboard.append(ikeyboard) if duration is not None: cb_string_64 = f"ytdl|audio|64k|mp3|{randem}" cb_string_128 = f"ytdl|audio|128k|mp3|{randem}" cb_string = f"ytdl|audio|320k|mp3|{randem}" inline_keyboard.extend( ( [ InlineKeyboardButton( "MP3 " + "(" + "64 kbps" + ")", callback_data=cb_string_64.encode("UTF-8"), ), InlineKeyboardButton( "MP3 " + "(" + "128 kbps" + ")", callback_data=cb_string_128.encode("UTF-8"), ), ], [ InlineKeyboardButton( "MP3 " + "(" + "320 kbps" + ")", callback_data=cb_string.encode("UTF-8"), ) ], ) ) else: format_id = response_json["format_id"] format_ext = response_json["ext"] cb_string_file = f"ytdl|file|{format_id}|{format_ext}|{randem}" cb_string_video = f'ytdl|{"video"}|{format_id}|{format_ext}|{randem}' inline_keyboard.append( [ InlineKeyboardButton( "SVideo", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "DFile", callback_data=(cb_string_file).encode("UTF-8") ), ] ) cb_string_file = f'{"file"}={format_id}={format_ext}' cb_string_video = f'{"video"}={format_id}={format_ext}' inline_keyboard.append( [ InlineKeyboardButton( "video", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "file", callback_data=(cb_string_file).encode("UTF-8") ), ] ) reply_markup = InlineKeyboardMarkup(inline_keyboard) thumbnail = "https://uxwing.com/wp-content/themes/uxwing/download/signs-and-symbols/no-video-icon.png" thumbnail_image = "https://uxwing.com/wp-content/themes/uxwing/download/signs-and-symbols/no-video-icon.png" if "thumbnail" in response_json and response_json["thumbnail"] is not None: thumbnail = response_json["thumbnail"] thumbnail_image = response_json["thumbnail"] thumb_image_path = DownLoadFile( thumbnail_image, f"YT_Down/{str(message.from_user.id)}{randem}.jpg", 128, None, "Trying to download..", message.id, message.chat.id, ) # bot, await message.reply_photo( photo=thumb_image_path, quote=True, caption=f"Select the desired format: file size might be approximate", reply_markup=reply_markup, ) else: cb_string_file = f'{"file"}={"LFO"}={"NONE"}' cb_string_video = f'{"video"}={"OFL"}={"ENON"}' inline_keyboard.append( [ InlineKeyboardButton( "SVideo", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "DFile", callback_data=(cb_string_file).encode("UTF-8") ), ] ) reply_markup = InlineKeyboardMarkup(inline_keyboard) await message.reply_photo( photo="https://telegra.ph/file/ce37f8203e1903feed544.png", quote=True, caption=f"""Select the desired format: file size might be approximate""", reply_markup=reply_markup, reply_to_message_id=message.id, ) @app.on_callback_query(filters.regex(r"^ytdl|")) async def youtube_dl_call_back(bot, update): cb_data = update.data usr = update.message.reply_to_message if update.from_user.id != usr.from_user.id: return await update.answer("⚠️ Akses Denied!", True) # youtube_dl extractors _, tg_send_type, youtube_dl_format, youtube_dl_ext, ranom = cb_data.split("|") thumb_image_path = f"YT_Down/{str(update.from_user.id)}{ranom}.jpg" save_ytdl_json_path = f"YT_Down/{str(update.from_user.id)}{ranom}.json" try: with open(save_ytdl_json_path, "r", encoding="utf8") as f: response_json = json.load(f) except FileNotFoundError: await update.message.delete() return False custom_file_name = ( f"{str(response_json.get('title'))}_{youtube_dl_format}.{youtube_dl_ext}" ) custom_file_name = "%(title,fulltitle,alt_title)s %(height& |)s%(height|)s%(height&p|)s%(fps|)s%(fps&fps|)s%(tbr& |)s%(tbr|)d.%(ext)s" youtube_dl_url = update.message.reply_to_message.text.split(" ", 1)[1] await update.message.edit_caption("Trying to download media...") tmp_directory_for_each_user = os.path.join( f"downloads/{str(update.from_user.id)}{random_char(5)}" ) if not os.path.isdir(tmp_directory_for_each_user): os.makedirs(tmp_directory_for_each_user) download_directory = os.path.join(tmp_directory_for_each_user, custom_file_name) if tg_send_type == "audio": command_to_exec = f"yt-dlp -c --ffmpeg-location '/usr/bin/mediaextract' --max-filesize 2097152000 --prefer-ffmpeg --extract-audio --embed-metadata --audio-format {youtube_dl_ext} --audio-quality {youtube_dl_format} {youtube_dl_url} --output '{download_directory}'" else: minus_f_format = youtube_dl_format if "youtu" in youtube_dl_url: minus_f_format = f"{youtube_dl_format}+bestaudio[ext=m4a]" command_to_exec = f"yt-dlp -c --ffmpeg-location '/usr/bin/mediaextract' --max-filesize 2097152000 --embed-subs --embed-metadata -f {minus_f_format} --hls-prefer-ffmpeg {youtube_dl_url} --output '{download_directory}'" start = datetime.now() t_response = (await shell_exec(command_to_exec))[0] if t_response: os.remove(save_ytdl_json_path) end_one = datetime.now() time_taken_for_download = (end_one - start).seconds file_size = 2097152000 + 1 download_directory_dirname = os.path.dirname(download_directory) download_directory_contents = os.listdir(download_directory_dirname) for download_directory_c in download_directory_contents: current_file_name = os.path.join( download_directory_dirname, download_directory_c ) file_size = os.stat(current_file_name).st_size if file_size == 0: await update.message.edit(text="File Not found 🤒") asyncio.create_task(clendir(tmp_directory_for_each_user)) return if file_size > 2097152000: await update.message.edit_caption( caption="I cannot upload files greater than 1.95GB due to Telegram API limitations. This file has size {}.".format( get_readable_file_size(file_size) ) ) asyncio.create_task(clendir(thumb_image_path)) asyncio.create_task(clendir(tmp_directory_for_each_user)) return else: await update.message.edit_caption(caption="Trying to upload..") # get the correct width, height, and duration # for videos greater than 10MB # ref: message from @BotSupport width = 0 height = 0 duration = 0 if tg_send_type != "file": metadata = extractMetadata(createParser(current_file_name)) if metadata is not None and metadata.has("duration"): duration = metadata.get("duration").seconds # get the correct width, height, and duration # for videos greater than 10MB if os.path.exists(thumb_image_path): # https://stackoverflow.com/a/21669827/4723940 Image.open(thumb_image_path).convert("RGB").save(thumb_image_path) metadata = extractMetadata(createParser(thumb_image_path)) if metadata.has("width"): width = metadata.get("width") if metadata.has("height"): height = metadata.get("height") if tg_send_type == "vm": height = width else: thumb_image_path = None start_time = time.time() # try to upload file if tg_send_type == "audio": await update.message.reply_audio( audio=current_file_name, caption=f"{os.path.basename(current_file_name)}", duration=duration, thumb=thumb_image_path, reply_to_message_id=usr.id, progress=progress_for_pyrogram, progress_args=( "Trying to upload...", update.message, start_time, ), ) elif tg_send_type == "file": await update.message.reply_document( document=current_file_name, thumb=thumb_image_path, caption=f"{os.path.basename(current_file_name)}", reply_to_message_id=usr.id, progress=progress_for_pyrogram, progress_args=( "Trying to upload...", update.message, start_time, ), ) elif tg_send_type == "vm": await update.message.reply_video_note( video_note=current_file_name, duration=duration, length=width, thumb=thumb_image_path, reply_to_message_id=usr.id, progress=progress_for_pyrogram, progress_args=( "Trying to upload...", update.message, start_time, ), ) elif tg_send_type == "video": await update.message.reply_video( video=current_file_name, caption=f"{os.path.basename(current_file_name)}", duration=duration, width=width, height=height, supports_streaming=True, reply_to_message_id=usr.id, thumb=thumb_image_path, progress=progress_for_pyrogram, progress_args=( "Trying to upload...", update.message, start_time, ), ) else: LOGGER.info("Did this happen? :\\") end_two = datetime.now() time_taken_for_upload = (end_two - end_one).seconds await update.message.edit_caption( caption="Downloaded in {} seconds.\nUploaded in {} seconds.".format( time_taken_for_download, time_taken_for_upload ) ) asyncio.create_task(clendir(thumb_image_path)) asyncio.create_task(clendir(tmp_directory_for_each_user)) await asyncio.sleep(5) await update.message.delete() async def clendir(directory): try: shutil.rmtree(directory) except: pass try: os.remove(directory) except: pass