MissKatyPyro/misskaty/plugins/ytdl_plugins.py
yasirarism d9557b828f
Fixx
2023-06-16 14:09:04 +00:00

240 lines
9.3 KiB
Python

from logging import getLogger
from re import compile as recompile
from uuid import uuid4
from iytdl import iYTDL, main, Process
from iytdl.exceptions import DownloadFailedError
from iytdl.constants import YT_VID_URL
from pyrogram import filters, Client
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto, Message
from misskaty import app
from misskaty.core.decorator.errors import capture_err
from misskaty.core.misskaty_patch.listen.listen import ListenerTimeout
from misskaty.core.decorator.ratelimiter import ratelimiter
from misskaty.helper.http import http
from misskaty.helper.localization import use_chat_lang
from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL, FF_MPEG_NAME
LOGGER = getLogger(__name__)
YT_REGEX = "^(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P<id>[A-Za-z0-9\-=_]{11})"
YT_DB = {}
def rand_key():
return str(uuid4())[:8]
@app.on_message(filters.command(["ytsearch"], COMMAND_HANDLER) & ~filters.channel)
@capture_err
@ratelimiter
@use_chat_lang()
async def ytsearch(self: Client, ctx: Message, strings):
if ctx.sender_chat:
return await ctx.reply_msg(strings("no_channel"))
if len(ctx.command) == 1:
return await ctx.reply_msg(strings("no_query"))
query = ctx.text.split(" ", maxsplit=1)[1]
search_key = rand_key()
YT_DB[search_key] = query
search = await main.VideosSearch(query).next()
if search["result"] == []:
return await ctx.reply_msg(strings("no_res").format(kweri=query))
i = search["result"][0]
out = f"<b><a href={i['link']}>{i['title']}</a></b>\n"
out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"])
btn = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
f"1/{len(search['result'])}",
callback_data=f"ytdl_scroll|{search_key}|1",
)
],
[InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")],
]
)
img = await get_ytthumb(i["id"])
caption = out
markup = btn
await ctx.reply_photo(img, caption=caption, reply_markup=markup, quote=True)
@app.on_message(filters.command(["ytdown"], COMMAND_HANDLER) | filters.regex(YT_REGEX) & ~filters.channel)
@capture_err
@ratelimiter
@use_chat_lang()
async def ytdownv2(self: Client, ctx: Message, strings):
if not ctx.from_user:
return await ctx.reply_msg(strings("no_channel"))
if ctx.command and len(ctx.command) == 1:
return await ctx.reply_msg(strings("invalid_link"))
url = ctx.input if ctx.command and len(ctx.command) > 1 else ctx.text
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location=f"/usr/bin/{FF_MPEG_NAME}") as ytdl:
try:
x = await ytdl.parse(url, extract=True)
if x is None:
return await ctx.reply_msg(strings("err_parse"))
caption = x.caption
markup = x.buttons
photo = x.image_url
msg = await ctx.reply_photo(photo, caption=caption, reply_markup=markup, quote=True)
await msg.wait_for_click(
from_user_id=ctx.from_user.id,
timeout=30
)
except ListenerTimeout:
await msg.edit_caption(strings("exp_task", context="general"))
except Exception as err:
await ctx.reply_msg(f"Opps, ERROR: {str(err)}")
@app.on_callback_query(filters.regex(r"^yt_listall"))
@ratelimiter
@use_chat_lang()
async def ytdl_listall_callback(self: Client, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location=f"/usr/bin/{FF_MPEG_NAME}") as ytdl:
media, buttons = await ytdl.listview(callback[1])
await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id))
@app.on_callback_query(filters.regex(r"^yt_extract_info"))
@ratelimiter
@use_chat_lang()
async def ytdl_extractinfo_callback(self: Client, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
await cq.answer(strings("wait"))
callback = cq.data.split("|")
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location=f"/usr/bin/{FF_MPEG_NAME}") as ytdl:
try:
if data := await ytdl.extract_info_from_key(callback[1]):
if len(callback[1]) == 11:
await cq.edit_message_text(
text=data.caption,
reply_markup=data.buttons.add(cq.from_user.id),
)
else:
await cq.edit_message_media(
media=(
InputMediaPhoto(
media=data.image_url,
caption=data.caption,
)
),
reply_markup=data.buttons.add(cq.from_user.id),
)
except Exception as e:
await cq.edit_message_text(f"Extract Info Failed -> {e}")
@app.on_callback_query(filters.regex(r"^yt_(gen|dl)"))
@ratelimiter
@use_chat_lang()
async def ytdl_gendl_callback(self: Client, cq: CallbackQuery, strings):
match = cq.data.split("|")
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
async with iYTDL(
log_group_id=LOG_CHANNEL,
cache_path="cache",
ffmpeg_location=f"/usr/bin/{FF_MPEG_NAME}",
delete_media=True,
) as ytdl:
try:
if match[0] == "yt_gen":
yt_url = False
video_link = await ytdl.cache.get_url(match[1])
else:
yt_url = True
video_link = f"{YT_VID_URL}{match[1]}"
media_type = "video" if match[3] == "v" else "audio"
uid, disp_str = ytdl.get_choice_by_id(match[2], media_type, yt_url=yt_url)
key = await ytdl.download(
url=video_link,
uid=uid,
downtype=media_type,
update=cq,
)
await ytdl.upload(
client=self,
key=key[0],
downtype=media_type,
update=cq,
)
except DownloadFailedError as e:
await cq.edit_message_caption(f"Download Failed - {e}")
except Exception as err:
await cq.edit_message_caption(f"Download Failed for url -> {video_link}\n\n<b>ERROR:</b> <code>{err}</code>")
@app.on_callback_query(filters.regex(r"^yt_cancel"))
@ratelimiter
@use_chat_lang()
async def ytdl_cancel_callback(self: Client, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
await cq.answer("Trying to Cancel Process..")
process_id = callback[1]
Process.cancel_id(process_id)
if cq.message:
await cq.message.delete()
else:
await cq.edit_message_text("✔️ `Stopped Successfully`")
@app.on_callback_query(filters.regex(r"^ytdl_scroll"))
@ratelimiter
@use_chat_lang()
async def ytdl_scroll_callback(self: Client, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
search_key = callback[1]
page = int(callback[2])
query = YT_DB[search_key]
search = await main.VideosSearch(query).next()
i = search["result"][page]
out = f"<b><a href={i['link']}>{i['title']}</a></b>"
out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"])
scroll_btn = [
[
InlineKeyboardButton(strings("back"), callback_data=f"ytdl_scroll|{search_key}|{page-1}"),
InlineKeyboardButton(
f"{page+1}/{len(search['result'])}",
callback_data=f"ytdl_scroll|{search_key}|{page+1}",
),
]
]
if page == 0:
if len(search["result"]) == 1:
return await cq.answer(strings("endlist"), show_alert=True)
scroll_btn = [[scroll_btn.pop().pop()]]
elif page == (len(search["result"]) - 1):
scroll_btn = [[scroll_btn.pop().pop(0)]]
btn = [[InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")]]
btn = InlineKeyboardMarkup(scroll_btn + btn)
await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn)
async def get_ytthumb(videoid: str):
thumb_quality = [
"maxresdefault.jpg", # Best quality
"hqdefault.jpg",
"sddefault.jpg",
"mqdefault.jpg",
"default.jpg", # Worst quality
]
thumb_link = "https://i.imgur.com/4LwPLai.png"
for qualiy in thumb_quality:
link = f"https://i.ytimg.com/vi/{videoid}/{qualiy}"
if (await http.get(link)).status_code == 200:
thumb_link = link
break
return thumb_link