from logging import getLogger from re import compile as recompile from uuid import uuid4 from iytdl import iYTDL, main from pyrogram import filters from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto from misskaty import app from misskaty.core.message_utils import * from misskaty.core.decorator.errors import capture_err from misskaty.core.decorator.ratelimiter import ratelimiter from misskaty.helper.http import http from misskaty.helper.localization import use_chat_lang from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL LOGGER = getLogger(__name__) regex = recompile(r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P[A-Za-z0-9\-=_]{11})") YT_DB = {} def rand_key(): return str(uuid4())[:8] @app.on_message(filters.command(["ytsearch"], COMMAND_HANDLER) & ~filters.channel) @capture_err @ratelimiter @use_chat_lang() async def ytsearch(_, message, strings): if message.sender_chat: return await kirimPesan(message, strings("no_channel")) if len(message.command) == 1: return await kirimPesan(message, strings("no_query")) query = message.text.split(" ", maxsplit=1)[1] search_key = rand_key() YT_DB[search_key] = query search = await main.VideosSearch(query).next() if search["result"] == []: return await message.reply(strings("no_res").format(kweri=query)) i = search["result"][0] out = f"{i['title']}\n" out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"]) btn = InlineKeyboardMarkup( [ [ InlineKeyboardButton( f"1/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|1", ) ], [InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")], ] ) img = await get_ytthumb(i["id"]) caption = out markup = btn await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) @app.on_message(filters.command(["ytdown"], COMMAND_HANDLER)) @capture_err @ratelimiter @use_chat_lang() async def ytdownv2(_, message, strings): if not message.from_user: return await kirimPesan(message, strings("no_channel")) if len(message.command) == 1: return await message.reply(strings("invalid_link")) url = message.text.split(" ", maxsplit=1)[1] async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: try: x = await ytdl.parse(url) if x is None: return await message.reply(strings("err_parse")) img = await get_ytthumb(x.key) caption = x.caption markup = x.buttons await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True) except Exception as err: await kirimPesan(message, f"Opps, ERROR: {str(err)}") @app.on_callback_query(filters.regex(r"^yt_listall")) @ratelimiter @use_chat_lang() async def ytdl_listall_callback(_, cq: CallbackQuery, strings): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer(strings("unauth"), True) callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: media, buttons = await ytdl.listview(callback[1]) await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id)) @app.on_callback_query(filters.regex(r"^yt_extract_info")) @ratelimiter @use_chat_lang() async def ytdl_extractinfo_callback(_, cq: CallbackQuery, strings): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer(strings("unauth"), True) await cq.answer(strings("wait")) callback = cq.data.split("|") async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: if data := await ytdl.extract_info_from_key(callback[1]): if len(key) == 11: await cq.edit_message_text( text=data.caption, reply_markup=data.buttons.add(cq.from_user.id), ) else: await cq.edit_message_media( media=( InputMediaPhoto( media=data.image_url, caption=data.caption, ) ), reply_markup=data.buttons.add(cq.from_user.id), ) @app.on_callback_query(filters.regex(r"^yt_(gen|dl)")) @ratelimiter @use_chat_lang() async def ytdl_gendl_callback(_, cq: CallbackQuery, strings): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer(strings("unauth"), True) callback = cq.data.split("|") key = callback[1] if callback[0] == "yt_gen": if match := regex.match(cq.message.reply_to_message.command[1]) or len(callback) == 2: x = await main.Extractor().get_download_button(key) await cq.edit_message_caption(caption=x.caption, reply_markup=x.buttons) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: try: upload_key = await ytdl.download(cq.message.reply_to_message.command[1], uid, format_, cq, True, 3) await ytdl.upload(app, upload_key[0], format_, cq, True) except Exception as err: await cq.edit_message_caption(err) else: uid = callback[2] type_ = callback[3] format_ = "audio" if type_ == "a" else "video" async with iYTDL( log_group_id=LOG_CHANNEL, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: try: upload_key = await ytdl.download( f"https://www.youtube.com/watch?v={key}", uid, format_, cq, True, 3, ) await ytdl.upload(app, upload_key[0], format_, cq, True) except Exception as err: await cq.edit_message_caption(err) @app.on_callback_query(filters.regex(r"^ytdl_scroll")) @ratelimiter @use_chat_lang() async def ytdl_scroll_callback(_, cq: CallbackQuery, strings): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer(strings("unauth"), True) callback = cq.data.split("|") search_key = callback[1] page = int(callback[2]) query = YT_DB[search_key] search = await main.VideosSearch(query).next() i = search["result"][page] out = f"{i['title']}" out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"]) scroll_btn = [ [ InlineKeyboardButton(strings("back"), callback_data=f"ytdl_scroll|{search_key}|{page-1}"), InlineKeyboardButton( f"{page+1}/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|{page+1}", ), ] ] if page == 0: if len(search["result"]) == 1: return await cq.answer(strings("endlist"), show_alert=True) scroll_btn = [[scroll_btn.pop().pop()]] elif page == (len(search["result"]) - 1): scroll_btn = [[scroll_btn.pop().pop(0)]] btn = [[InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")]] btn = InlineKeyboardMarkup(scroll_btn + btn) await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn) async def get_ytthumb(videoid: str): thumb_quality = [ "maxresdefault.jpg", # Best quality "hqdefault.jpg", "sddefault.jpg", "mqdefault.jpg", "default.jpg", # Worst quality ] thumb_link = "https://i.imgur.com/4LwPLai.png" for qualiy in thumb_quality: link = f"https://i.ytimg.com/vi/{videoid}/{qualiy}" if (await http.get(link)).status_code == 200: thumb_link = link break return thumb_link