from logging import getLogger from re import compile as recompile from uuid import uuid4 from iytdl import iYTDL, main from pyrogram import filters from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto from misskaty import app from misskaty.core.message_utils import * from misskaty.core.decorator.errors import capture_err from misskaty.core.decorator.ratelimiter import ratelimiter from misskaty.helper.http import http from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL LOGGER = getLogger(__name__) regex = recompile(r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P[A-Za-z0-9\-=_]{11})") YT_DB = {} def rand_key(): return str(uuid4())[:8] @app.on_message(filters.command(["ytsearch"], COMMAND_HANDLER) & ~filters.channel) @capture_err @ratelimiter async def ytsearch(_, message): if message.sender_chat: return await kirimPesan(message, "This feature not supported for channel.") if len(message.command) == 1: return await kirimPesan(message, "Please input a query..!") query = message.text.split(" ", maxsplit=1)[1] search_key = rand_key() YT_DB[search_key] = query search = await main.VideosSearch(query).next() if search["result"] == []: return await message.reply(f"No result found for `{query}`") i = search["result"][0] out = f"{i['title']}" out += f"\nPublished {i['publishedTime']}\n" out += f"\n❯ Duration: {i['duration']}" out += f"\n❯ Views: {i['viewCount']['short']}" out += f"\n❯ Uploader: {i['channel']['name']}\n\n" btn = InlineKeyboardMarkup( [ [ InlineKeyboardButton( f"1/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|1", ) ], [InlineKeyboardButton("Download", callback_data=f"yt_gen|{i['id']}")], ] ) img = await get_ytthumb(i["id"]) caption = out markup = btn await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) @app.on_message(filters.command(["ytdown"], COMMAND_HANDLER)) @capture_err @ratelimiter async def ytdownv2(_, message): if not message.from_user: return if len(message.command) == 1: return await message.reply("Please input a valid YT-DLP Supported URL") url = message.text.split(" ", maxsplit=1)[1] async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: try: x = await ytdl.parse(url) if x is None: return await message.reply("Failed parse URL, check logs..") img = await get_ytthumb(x.key) caption = x.caption markup = x.buttons await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) except Exception as err: await kirimPesan(message, f"Opps, ERROR: {str(err)}") @app.on_callback_query(filters.regex(r"^yt_listall")) @ratelimiter async def ytdl_listall_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: media, buttons = await ytdl.listview(callback[1]) await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id)) @app.on_callback_query(filters.regex(r"^yt_extract_info")) @ratelimiter async def ytdl_extractinfo_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) await cq.answer("Please Wait...") callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: if data := await ytdl.extract_info_from_key(callback[1]): if len(key) == 11: await cq.edit_message_text( text=data.caption, reply_markup=data.buttons.add(cq.from_user.id), ) else: await cq.edit_message_media( media=( InputMediaPhoto( media=data.image_url, caption=data.caption, ) ), reply_markup=data.buttons.add(cq.from_user.id), ) @app.on_callback_query(filters.regex(r"^yt_(gen|dl)")) @ratelimiter async def ytdl_gendl_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") key = callback[1] if callback[0] == "yt_gen": if match := regex.match(cq.message.reply_to_message.command[1]) or len(callback) == 2: x = await main.Extractor().get_download_button(key) await cq.edit_message_caption(caption=x.caption, reply_markup=x.buttons) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: try: upload_key = await ytdl.download(cq.message.reply_to_message.command[1], uid, format_, cq, True, 3) await ytdl.upload(app, upload_key[0], format_, cq, True) except Exception as err: await cq.edit_message_caption(err) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: try: upload_key = await ytdl.download( f"https://www.youtube.com/watch?v={key}", uid, format_, cq, True, 3, ) await ytdl.upload(app, upload_key[0], format_, cq, True) except Exception as err: await cq.edit_message_caption(err) @app.on_callback_query(filters.regex(r"^ytdl_scroll")) @ratelimiter async def ytdl_scroll_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") search_key = callback[1] page = int(callback[2]) query = YT_DB[search_key] search = await main.VideosSearch(query).next() i = search["result"][page] out = f"{i['title']}" out += f"\nPublished {i['publishedTime']}\n" out += f"\n❯ Duration: {i['duration']}" out += f"\n❯ Views: {i['viewCount']['short']}" out += f"\n❯ Uploader: {i['channel']['name']}\n\n" scroll_btn = [ [ InlineKeyboardButton("Back", callback_data=f"ytdl_scroll|{search_key}|{page-1}"), InlineKeyboardButton( f"{page+1}/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|{page+1}", ), ] ] if page == 0: if len(search["result"]) == 1: return await cq.answer("That's the end of list", show_alert=True) scroll_btn = [[scroll_btn.pop().pop()]] elif page == (len(search["result"]) - 1): scroll_btn = [[scroll_btn.pop().pop(0)]] btn = [[InlineKeyboardButton("Download", callback_data=f"yt_gen|{i['id']}")]] btn = InlineKeyboardMarkup(scroll_btn + btn) await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn) async def get_ytthumb(videoid: str): thumb_quality = [ "maxresdefault.jpg", # Best quality "hqdefault.jpg", "sddefault.jpg", "mqdefault.jpg", "default.jpg", # Worst quality ] thumb_link = "https://i.imgur.com/4LwPLai.png" for qualiy in thumb_quality: link = f"https://i.ytimg.com/vi/{videoid}/{qualiy}" if (await http.get(link)).status_code == 200: thumb_link = link break return thumb_link