from logging import getLogger from re import compile as recompile from uuid import uuid4 from iytdl import iYTDL, main from pyrogram import filters from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto from misskaty import app from misskaty.core.message_utils import * from misskaty.core.decorator.errors import capture_err from misskaty.core.decorator.pyro_cooldown import wait from misskaty.helper.http import http from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL LOGGER = getLogger(__name__) regex = recompile(r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P[A-Za-z0-9\-=_]{11})") YT_DB = {} def rand_key(): return str(uuid4())[:8] @app.on_message(filters.command(["ytsearch"], COMMAND_HANDLER) & ~filters.channel & wait(30)) @capture_err async def ytsearch(_, message): if message.sender_chat: return await kirimPesan(message, "This feature not supported for channel.") if len(message.command) == 1: return await kirimPesan(message, "Please input a query..!") query = message.text.split(" ", maxsplit=1)[1] search_key = rand_key() YT_DB[search_key] = query search = await main.VideosSearch(query).next() if search["result"] == []: return await message.reply(f"No result found for `{query}`") i = search["result"][0] out = f"{i['title']}" out += f"\nPublished {i['publishedTime']}\n" out += f"\n❯ Duration: {i['duration']}" out += f"\n❯ Views: {i['viewCount']['short']}" out += f"\n❯ Uploader: {i['channel']['name']}\n\n" btn = InlineKeyboardMarkup( [ [ InlineKeyboardButton( f"1/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|1", ) ], [InlineKeyboardButton("Download", callback_data=f"yt_gen|{i['id']}")], ] ) img = await get_ytthumb(i["id"]) caption = out markup = btn await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) @app.on_message(filters.command(["ytdown"], COMMAND_HANDLER)) @capture_err async def ytdownv2(_, message): if not message.from_user: return if len(message.command) == 1: return await message.reply("Please input a valid YT-DLP Supported URL") url = message.text.split(" ", maxsplit=1)[1] async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: x = await ytdl.parse(url) if x is None: return await message.reply("Failed parse URL, check logs..") img = await get_ytthumb(x.key) caption = x.caption markup = x.buttons await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) @app.on_callback_query(filters.regex(r"^yt_listall")) async def ytdl_listall_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: media, buttons = await ytdl.listview(callback[1]) await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id)) @app.on_callback_query(filters.regex(r"^yt_extract_info")) async def ytdl_extractinfo_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) await cq.answer("Please Wait...") callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: if data := await ytdl.extract_info_from_key(callback[1]): if len(key) == 11: await cq.edit_message_text( text=data.caption, reply_markup=data.buttons.add(cq.from_user.id), ) else: await cq.edit_message_media( media=( InputMediaPhoto( media=data.image_url, caption=data.caption, ) ), reply_markup=data.buttons.add(cq.from_user.id), ) @app.on_callback_query(filters.regex(r"^yt_(gen|dl)")) async def ytdl_gendl_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") key = callback[1] if callback[0] == "yt_gen": if match := regex.match(cq.message.reply_to_message.command[1]) or len(callback) == 2: x = await main.Extractor().get_download_button(key) await cq.edit_message_caption(caption=x.caption, reply_markup=x.buttons) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: upload_key = await ytdl.download(cq.message.reply_to_message.command[1], uid, format_, cq, True, 3) await ytdl.upload(app, upload_key, format_, cq, True) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: upload_key = await ytdl.download( f"https://www.youtube.com/watch?v={key}", uid, format_, cq, True, 3, ) await ytdl.upload(app, upload_key, format_, cq, True) @app.on_callback_query(filters.regex(r"^ytdl_scroll")) async def ytdl_scroll_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") search_key = callback[1] page = int(callback[2]) query = YT_DB[search_key] search = await main.VideosSearch(query).next() i = search["result"][page] out = f"{i['title']}" out += f"\nPublished {i['publishedTime']}\n" out += f"\n❯ Duration: {i['duration']}" out += f"\n❯ Views: {i['viewCount']['short']}" out += f"\n❯ Uploader: {i['channel']['name']}\n\n" scroll_btn = [ [ InlineKeyboardButton("Back", callback_data=f"ytdl_scroll|{search_key}|{page-1}"), InlineKeyboardButton( f"{page+1}/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|{page+1}", ), ] ] if page == 0: if len(search["result"]) == 1: return await cq.answer("That's the end of list", show_alert=True) scroll_btn = [[scroll_btn.pop().pop()]] elif page == (len(search["result"]) - 1): scroll_btn = [[scroll_btn.pop().pop(0)]] btn = [[InlineKeyboardButton("Download", callback_data=f"yt_gen|{i['id']}")]] btn = InlineKeyboardMarkup(scroll_btn + btn) await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn) async def get_ytthumb(videoid: str): thumb_quality = [ "maxresdefault.jpg", # Best quality "hqdefault.jpg", "sddefault.jpg", "mqdefault.jpg", "default.jpg", # Worst quality ] thumb_link = "https://i.imgur.com/4LwPLai.png" for qualiy in thumb_quality: link = f"https://i.ytimg.com/vi/{videoid}/{qualiy}" if (await http.get(link)).status_code == 200: thumb_link = link break return thumb_link