MissKatyPyro/misskaty/plugins/ytdl_plugins.py
2023-04-03 09:32:26 +07:00

222 lines
8.6 KiB
Python

from logging import getLogger
from re import compile as recompile
from uuid import uuid4
from iytdl import iYTDL, main
from pyrogram import filters
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto
from misskaty import app
from misskaty.core.message_utils import *
from misskaty.core.decorator.errors import capture_err
from misskaty.core.decorator.ratelimiter import ratelimiter
from misskaty.helper.http import http
from misskaty.helper.localization import use_chat_lang
from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL
LOGGER = getLogger(__name__)
regex = recompile(r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P<id>[A-Za-z0-9\-=_]{11})")
YT_DB = {}
def rand_key():
return str(uuid4())[:8]
@app.on_message(filters.command(["ytsearch"], COMMAND_HANDLER) & ~filters.channel)
@capture_err
@ratelimiter
@use_chat_lang()
async def ytsearch(_, message, strings):
if message.sender_chat:
return await kirimPesan(message, strings("no_channel"))
if len(message.command) == 1:
return await kirimPesan(message, strings("no_query"))
query = message.text.split(" ", maxsplit=1)[1]
search_key = rand_key()
YT_DB[search_key] = query
search = await main.VideosSearch(query).next()
if search["result"] == []:
return await message.reply(strings("no_res").format(kweri=query))
i = search["result"][0]
out = f"<b><a href={i['link']}>{i['title']}</a></b>\n"
out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"])
btn = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
f"1/{len(search['result'])}",
callback_data=f"ytdl_scroll|{search_key}|1",
)
],
[InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")],
]
)
img = await get_ytthumb(i["id"])
caption = out
markup = btn
await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True)
@app.on_message(filters.command(["ytdown"], COMMAND_HANDLER))
@capture_err
@ratelimiter
@use_chat_lang()
async def ytdownv2(_, message, strings):
if not message.from_user:
return await kirimPesan(message, strings("no_channel"))
if len(message.command) == 1:
return await message.reply(strings("invalid_link"))
url = message.text.split(" ", maxsplit=1)[1]
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl:
try:
x = await ytdl.parse(url)
if x is None:
return await message.reply(strings("err_parse"))
img = await get_ytthumb(x.key)
caption = x.caption
markup = x.buttons
await message.reply_photo(img, caption=caption, reply_markup=markup, quote=True)
except Exception as err:
await kirimPesan(message, f"Opps, ERROR: {str(err)}")
@app.on_callback_query(filters.regex(r"^yt_listall"))
@ratelimiter
@use_chat_lang()
async def ytdl_listall_callback(_, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl:
media, buttons = await ytdl.listview(callback[1])
await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id))
@app.on_callback_query(filters.regex(r"^yt_extract_info"))
@ratelimiter
@use_chat_lang()
async def ytdl_extractinfo_callback(_, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
await cq.answer(strings("wait"))
callback = cq.data.split("|")
async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl:
if data := await ytdl.extract_info_from_key(callback[1]):
if len(key) == 11:
await cq.edit_message_text(
text=data.caption,
reply_markup=data.buttons.add(cq.from_user.id),
)
else:
await cq.edit_message_media(
media=(
InputMediaPhoto(
media=data.image_url,
caption=data.caption,
)
),
reply_markup=data.buttons.add(cq.from_user.id),
)
@app.on_callback_query(filters.regex(r"^yt_(gen|dl)"))
@ratelimiter
@use_chat_lang()
async def ytdl_gendl_callback(_, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
key = callback[1]
if callback[0] == "yt_gen":
if match := regex.match(cq.message.reply_to_message.command[1]) or len(callback) == 2:
x = await main.Extractor().get_download_button(key)
await cq.edit_message_caption(caption=x.caption, reply_markup=x.buttons)
else:
uid = callback[2]
type_ = callback[3]
format_ = "audio" if type_ == "a" else "video"
async with iYTDL(
log_group_id=LOG_CHANNEL,
cache_path="cache",
ffmpeg_location="/usr/bin/mediaextract",
delete_media=True,
) as ytdl:
try:
upload_key = await ytdl.download(cq.message.reply_to_message.command[1], uid, format_, cq, True, 3)
await ytdl.upload(app, upload_key[0], format_, cq, True)
except Exception as err:
await cq.edit_message_caption(err)
else:
uid = callback[2]
type_ = callback[3]
format_ = "audio" if type_ == "a" else "video"
async with iYTDL(
log_group_id=LOG_CHANNEL,
cache_path="cache",
ffmpeg_location="/usr/bin/mediaextract",
delete_media=True,
) as ytdl:
try:
upload_key = await ytdl.download(
f"https://www.youtube.com/watch?v={key}",
uid,
format_,
cq,
True,
3,
)
await ytdl.upload(app, upload_key[0], format_, cq, True)
except Exception as err:
await cq.edit_message_caption(err)
@app.on_callback_query(filters.regex(r"^ytdl_scroll"))
@ratelimiter
@use_chat_lang()
async def ytdl_scroll_callback(_, cq: CallbackQuery, strings):
if cq.from_user.id != cq.message.reply_to_message.from_user.id:
return await cq.answer(strings("unauth"), True)
callback = cq.data.split("|")
search_key = callback[1]
page = int(callback[2])
query = YT_DB[search_key]
search = await main.VideosSearch(query).next()
i = search["result"][page]
out = f"<b><a href={i['link']}>{i['title']}</a></b>"
out = strings("yts_msg").format(pub=i["publishedTime"], dur=i["duration"], vi=i["viewCount"]["short"], clink=i["channel"]["link"], cname=i["channel"]["name"])
scroll_btn = [
[
InlineKeyboardButton(strings("back"), callback_data=f"ytdl_scroll|{search_key}|{page-1}"),
InlineKeyboardButton(
f"{page+1}/{len(search['result'])}",
callback_data=f"ytdl_scroll|{search_key}|{page+1}",
),
]
]
if page == 0:
if len(search["result"]) == 1:
return await cq.answer(strings("endlist"), show_alert=True)
scroll_btn = [[scroll_btn.pop().pop()]]
elif page == (len(search["result"]) - 1):
scroll_btn = [[scroll_btn.pop().pop(0)]]
btn = [[InlineKeyboardButton(strings("dl_btn"), callback_data=f"yt_gen|{i['id']}")]]
btn = InlineKeyboardMarkup(scroll_btn + btn)
await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn)
async def get_ytthumb(videoid: str):
thumb_quality = [
"maxresdefault.jpg", # Best quality
"hqdefault.jpg",
"sddefault.jpg",
"mqdefault.jpg",
"default.jpg", # Worst quality
]
thumb_link = "https://i.imgur.com/4LwPLai.png"
for qualiy in thumb_quality:
link = f"https://i.ytimg.com/vi/{videoid}/{qualiy}"
if (await http.get(link)).status_code == 200:
thumb_link = link
break
return thumb_link