* admin mention fix

* Fix

* cache admin

* Update __init__.py

* style: format code with black and isort (#105)

Format code with black and isort

This commit fixes the style issues introduced in 57fb50e according to the output
from Black and isort.

Details: https://app.deepsource.com/gh/yasirarism/MissKatyPyro/transform/c3ccbdad-dd4f-44e8-b2ce-1898c1c9240e/

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
This commit is contained in:
yasirarism 2023-06-26 15:07:17 +00:00 committed by GitHub
parent bf4d1b7195
commit 3473c36f62
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 423 additions and 212 deletions

View file

@ -41,7 +41,7 @@ MOD_NOLOAD = ["subscene_dl"]
HELPABLE = {} HELPABLE = {}
cleanmode = {} cleanmode = {}
botStartTime = time.time() botStartTime = time.time()
misskaty_version = "v2.8.7 - Stable" misskaty_version = "v2.9.1 - Stable"
# Pyrogram Bot Client # Pyrogram Bot Client
app = Client( app = Client(
@ -58,7 +58,11 @@ user = Client(
session_string=USER_SESSION, session_string=USER_SESSION,
) )
jobstores = {"default": MongoDBJobStore(client=MongoClient(DATABASE_URI), database=DATABASE_NAME, collection="nightmode")} jobstores = {
"default": MongoDBJobStore(
client=MongoClient(DATABASE_URI), database=DATABASE_NAME, collection="nightmode"
)
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone=TZ) scheduler = AsyncIOScheduler(jobstores=jobstores, timezone=TZ)
app.start() app.start()

View file

@ -3,6 +3,7 @@ from time import time
from traceback import format_exc as err from traceback import format_exc as err
from typing import Optional, Union from typing import Optional, Union
from cachetools import TTLCache
from pyrogram import Client, enums from pyrogram import Client, enums
from pyrogram.errors import ChannelPrivate, ChatAdminRequired, ChatWriteForbidden from pyrogram.errors import ChannelPrivate, ChatAdminRequired, ChatWriteForbidden
from pyrogram.types import CallbackQuery, Message from pyrogram.types import CallbackQuery, Message
@ -77,16 +78,22 @@ async def check_perms(
if isinstance(permissions, str): if isinstance(permissions, str):
permissions = [permissions] permissions = [permissions]
missing_perms = [permission for permission in permissions if not getattr(user.privileges, permission)] missing_perms = [
permission
for permission in permissions
if not getattr(user.privileges, permission)
]
if not missing_perms: if not missing_perms:
return True return True
if complain_missing_perms: if complain_missing_perms:
await sender(strings("no_permission_error").format(permissions=", ".join(missing_perms))) await sender(
strings("no_permission_error").format(permissions=", ".join(missing_perms))
)
return False return False
admins_in_chat = {} admins_in_chat = TTLCache(maxsize=1000, ttl=6 * 60 * 60)
async def list_admins(chat_id: int): async def list_admins(chat_id: int):
@ -98,7 +105,12 @@ async def list_admins(chat_id: int):
try: try:
admins_in_chat[chat_id] = { admins_in_chat[chat_id] = {
"last_updated_at": time(), "last_updated_at": time(),
"data": [member.user.id async for member in app.get_chat_members(chat_id, filter=enums.ChatMembersFilter.ADMINISTRATORS)], "data": [
member.user.id
async for member in app.get_chat_members(
chat_id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
],
} }
return admins_in_chat[chat_id]["data"] return admins_in_chat[chat_id]["data"]
except ChannelPrivate: except ChannelPrivate:
@ -167,7 +179,9 @@ def require_admin(
): ):
def decorator(func): def decorator(func):
@wraps(func) @wraps(func)
async def wrapper(client: Client, message: Union[CallbackQuery, Message], *args, **kwargs): async def wrapper(
client: Client, message: Union[CallbackQuery, Message], *args, **kwargs
):
lang = await get_lang(message) lang = await get_lang(message)
strings = partial( strings = partial(
get_locale_string, get_locale_string,
@ -183,7 +197,9 @@ def require_admin(
sender = message.reply_text sender = message.reply_text
msg = message msg = message
else: else:
raise NotImplementedError(f"require_admin can't process updates with the type '{message.__name__}' yet.") raise NotImplementedError(
f"require_admin can't process updates with the type '{message.__name__}' yet."
)
# We don't actually check private and channel chats. # We don't actually check private and channel chats.
if msg.chat.type == enums.ChatType.PRIVATE: if msg.chat.type == enums.ChatType.PRIVATE:
@ -192,7 +208,9 @@ def require_admin(
return await sender(strings("private_not_allowed")) return await sender(strings("private_not_allowed"))
if msg.chat.type == enums.ChatType.CHANNEL: if msg.chat.type == enums.ChatType.CHANNEL:
return await func(client, message, *args, *kwargs) return await func(client, message, *args, *kwargs)
has_perms = await check_perms(message, permissions, complain_missing_perms, strings) has_perms = await check_perms(
message, permissions, complain_missing_perms, strings
)
if has_perms: if has_perms:
return await func(client, message, *args, *kwargs) return await func(client, message, *args, *kwargs)

View file

@ -187,7 +187,7 @@ async def byPassPh(url: str, name: str):
class Kusonime: class Kusonime:
def __init__(self): def __init__(self):
raise NotImplementedError() pass
@staticmethod @staticmethod
async def byPass(url): async def byPass(url):

View file

@ -93,7 +93,12 @@ async def admin_cache_func(_, cmu):
try: try:
admins_in_chat[cmu.chat.id] = { admins_in_chat[cmu.chat.id] = {
"last_updated_at": time(), "last_updated_at": time(),
"data": [member.user.id async for member in app.get_chat_members(cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)], "data": [
member.user.id
async for member in app.get_chat_members(
cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
],
} }
LOGGER.info(f"Updated admin cache for {cmu.chat.id} [{cmu.chat.title}]") LOGGER.info(f"Updated admin cache for {cmu.chat.id} [{cmu.chat.title}]")
except: except:
@ -192,7 +197,9 @@ async def kickFunc(client: Client, ctx: Message, strings) -> "Message":
# Ban/DBan/TBan User # Ban/DBan/TBan User
@app.on_message(filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & filters.group) @app.on_message(
filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & filters.group
)
@adminsOnly("can_restrict_members") @adminsOnly("can_restrict_members")
@ratelimiter @ratelimiter
@use_chat_lang() @use_chat_lang()
@ -213,7 +220,11 @@ async def banFunc(client, message, strings):
try: try:
mention = (await app.get_users(user_id)).mention mention = (await app.get_users(user_id)).mention
except IndexError: except IndexError:
mention = message.reply_to_message.sender_chat.title if message.reply_to_message else "Anon" mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = strings("ban_msg").format( msg = strings("ban_msg").format(
mention=mention, mention=mention,
@ -280,7 +291,9 @@ async def unban_func(self, message, strings):
# Ban users listed in a message # Ban users listed in a message
@app.on_message(filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & filters.group) @app.on_message(
filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & filters.group
)
@ratelimiter @ratelimiter
@use_chat_lang() @use_chat_lang()
async def list_ban_(c, message, strings): async def list_ban_(c, message, strings):
@ -295,7 +308,9 @@ async def list_ban_(c, message, strings):
lreason = msglink_reason.split() lreason = msglink_reason.split()
messagelink, reason = lreason[0], " ".join(lreason[1:]) messagelink, reason = lreason[0], " ".join(lreason[1:])
if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink): # validate link if not re.search(
r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink
): # validate link
return await message.reply_text(strings("invalid_tg_link")) return await message.reply_text(strings("invalid_tg_link"))
if userid == c.me.id: if userid == c.me.id:
@ -333,7 +348,9 @@ async def list_ban_(c, message, strings):
# Unban users listed in a message # Unban users listed in a message
@app.on_message(filters.user(SUDO) & filters.command("listunban", COMMAND_HANDLER) & filters.group) @app.on_message(
filters.user(SUDO) & filters.command("listunban", COMMAND_HANDLER) & filters.group
)
@ratelimiter @ratelimiter
@use_chat_lang() @use_chat_lang()
async def list_unban_(c, message, strings): async def list_unban_(c, message, strings):
@ -365,7 +382,9 @@ async def list_unban_(c, message, strings):
continue continue
count += 1 count += 1
mention = (await app.get_users(userid)).mention mention = (await app.get_users(userid)).mention
msg = strings("listunban_msg").format(mention=mention, uid=userid, frus=message.from_user.mention, ct=count) msg = strings("listunban_msg").format(
mention=mention, uid=userid, frus=message.from_user.mention, ct=count
)
await m.edit_text(msg) await m.edit_text(msg)
@ -387,7 +406,9 @@ async def deleteFunc(_, message, strings):
# Promote Members # Promote Members
@app.on_message(filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & filters.group) @app.on_message(
filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & filters.group
)
@adminsOnly("can_promote_members") @adminsOnly("can_promote_members")
@ratelimiter @ratelimiter
@use_chat_lang() @use_chat_lang()
@ -420,7 +441,9 @@ async def promoteFunc(client, message, strings):
can_manage_video_chats=bot.privileges.can_manage_video_chats, can_manage_video_chats=bot.privileges.can_manage_video_chats,
), ),
) )
return await message.reply_text(strings("full_promote").format(umention=umention)) return await message.reply_text(
strings("full_promote").format(umention=umention)
)
await message.chat.promote_member( await message.chat.promote_member(
user_id=user_id, user_id=user_id,
@ -629,7 +652,11 @@ async def remove_warning(_, cq, strings):
if warns: if warns:
warns = warns["warns"] warns = warns["warns"]
if not warns or warns == 0: if not warns or warns == 0:
return await cq.answer(strings("user_no_warn").format(mention=cq.message.reply_to_message.from_user.id)) return await cq.answer(
strings("user_no_warn").format(
mention=cq.message.reply_to_message.from_user.id
)
)
warn = {"warns": warns - 1} warn = {"warns": warns - 1}
await add_warn(chat_id, await int_to_alpha(user_id), warn) await add_warn(chat_id, await int_to_alpha(user_id), warn)
text = cq.message.text.markdown text = cq.message.text.markdown
@ -723,11 +750,19 @@ async def check_warns(_, message, strings):
warns = warns["warns"] warns = warns["warns"]
else: else:
return await message.reply_text(strings("user_no_warn").format(mention=mention)) return await message.reply_text(strings("user_no_warn").format(mention=mention))
return await message.reply_text(strings("ch_warn_msg").format(mention=mention, warns=warns)) return await message.reply_text(
strings("ch_warn_msg").format(mention=mention, warns=warns)
)
# Report User in Group # Report User in Group
@app.on_message((filters.command("report", COMMAND_HANDLER) | filters.command(["admins", "admin"], prefixes="@")) & filters.group) @app.on_message(
(
filters.command("report", COMMAND_HANDLER)
| filters.command(["admins", "admin"], prefixes="@")
)
& filters.group
)
@capture_err @capture_err
@ratelimiter @ratelimiter
@use_chat_lang() @use_chat_lang()
@ -745,16 +780,27 @@ async def report_user(self: Client, ctx: Message, strings) -> "Message":
if linked_chat is None: if linked_chat is None:
if reply_id in list_of_admins or reply_id == ctx.chat.id: if reply_id in list_of_admins or reply_id == ctx.chat.id:
return await ctx.reply_text(strings("reported_is_admin")) return await ctx.reply_text(strings("reported_is_admin"))
elif reply_id in list_of_admins or reply_id == ctx.chat.id or reply_id == linked_chat.id: elif (
reply_id in list_of_admins
or reply_id == ctx.chat.id
or reply_id == linked_chat.id
):
return await ctx.reply_text(strings("reported_is_admin")) return await ctx.reply_text(strings("reported_is_admin"))
user_mention = reply.from_user.mention if reply.from_user else reply.sender_chat.title user_mention = (
reply.from_user.mention if reply.from_user else reply.sender_chat.title
)
text = strings("report_msg").format(user_mention=user_mention) text = strings("report_msg").format(user_mention=user_mention)
admin_data = [m async for m in app.get_chat_members(ctx.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)] admin_data = [
m
async for m in app.get_chat_members(
ctx.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
]
for admin in admin_data: for admin in admin_data:
if admin.user.is_bot or admin.user.is_deleted: if admin.user.is_bot or admin.user.is_deleted:
# return bots or deleted admins # return bots or deleted admins
continue continue
text += f"<a href='tg://user?id={admin.user.id}>\u2063</a>" text += f"<a href='tg://user?id={admin.user.id}'>\u2063</a>"
await ctx.reply_msg(text, reply_to_message_id=ctx.reply_to_message.id) await ctx.reply_msg(text, reply_to_message_id=ctx.reply_to_message.id)
@ -766,7 +812,9 @@ async def set_chat_title(self: Client, ctx: Message):
old_title = ctx.chat.title old_title = ctx.chat.title
new_title = ctx.text.split(None, 1)[1] new_title = ctx.text.split(None, 1)[1]
await ctx.chat.set_title(new_title) await ctx.chat.set_title(new_title)
await ctx.reply_text(f"Successfully Changed Group Title From {old_title} To {new_title}") await ctx.reply_text(
f"Successfully Changed Group Title From {old_title} To {new_title}"
)
@app.on_message(filters.command("set_user_title", COMMAND_HANDLER) & ~filters.private) @app.on_message(filters.command("set_user_title", COMMAND_HANDLER) & ~filters.private)
@ -779,10 +827,14 @@ async def set_user_title(self: Client, ctx: Message):
chat_id = ctx.chat.id chat_id = ctx.chat.id
from_user = ctx.reply_to_message.from_user from_user = ctx.reply_to_message.from_user
if len(ctx.command) < 2: if len(ctx.command) < 2:
return await ctx.reply_text("**Usage:**\n/set_user_title NEW ADMINISTRATOR TITLE") return await ctx.reply_text(
"**Usage:**\n/set_user_title NEW ADMINISTRATOR TITLE"
)
title = ctx.text.split(None, 1)[1] title = ctx.text.split(None, 1)[1]
await app.set_administrator_title(chat_id, from_user.id, title) await app.set_administrator_title(chat_id, from_user.id, title)
await ctx.reply_text(f"Successfully Changed {from_user.mention}'s Admin Title To {title}") await ctx.reply_text(
f"Successfully Changed {from_user.mention}'s Admin Title To {title}"
)
@app.on_message(filters.command("set_chat_photo", COMMAND_HANDLER) & ~filters.private) @app.on_message(filters.command("set_chat_photo", COMMAND_HANDLER) & ~filters.private)
@ -795,7 +847,9 @@ async def set_chat_photo(self: Client, ctx: Message):
file = reply.document or reply.photo file = reply.document or reply.photo
if not file: if not file:
return await ctx.reply_text("Reply to a photo or document to set it as chat_photo") return await ctx.reply_text(
"Reply to a photo or document to set it as chat_photo"
)
if file.file_size > 5000000: if file.file_size > 5000000:
return await ctx.reply("File size too large.") return await ctx.reply("File size too large.")

View file

@ -12,14 +12,17 @@ from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL, SUDO, SUPPORT_CHAT
async def ban_reply(self: Client, ctx: Message): async def ban_reply(self: Client, ctx: Message):
if not ctx.from_user: if not ctx.from_user:
return return
ban = await db.get_ban_status(ctx.from_user.id) try:
if (ban.get("is_banned") and ctx.chat.type.value == "private") or ( ban = await db.get_ban_status(ctx.from_user.id)
ban.get("is_banned") and ctx.chat.type.value == "supergroup" and ctx.command if (ban.get("is_banned") and ctx.chat.type.value == "private") or (
): ban.get("is_banned") and ctx.chat.type.value == "supergroup" and ctx.command
await ctx.reply_msg( ):
f'I am sorry, You are banned to use Me. \nBan Reason: {ban["ban_reason"]}' await ctx.reply_msg(
) f'I am sorry, You are banned to use Me. \nBan Reason: {ban["ban_reason"]}'
await ctx.stop_propagation() )
await ctx.stop_propagation()
except:
await ctx.continue_propagation()
@app.on_message(filters.group & filters.incoming, group=-2) @app.on_message(filters.group & filters.incoming, group=-2)
@ -55,7 +58,10 @@ async def grp_bd(self: Client, ctx: Message, strings):
await k.pin() await k.pin()
except: except:
pass pass
await self.leave_chat(ctx.chat.id) try:
await self.leave_chat(ctx.chat.id)
except:
pass
await ctx.stop_propagation() await ctx.stop_propagation()

View file

@ -38,7 +38,9 @@ from utils import demoji
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
LIST_CARI = {} LIST_CARI = {}
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/600.1.17 (KHTML, like Gecko) Version/7.1 Safari/537.85.10"} headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/600.1.17 (KHTML, like Gecko) Version/7.1 Safari/537.85.10"
}
# IMDB Choose Language # IMDB Choose Language
@ -49,10 +51,12 @@ async def imdb_choose(self: Client, ctx: Message):
if len(ctx.command) == 1: if len(ctx.command) == 1:
return await ctx.reply_msg( return await ctx.reply_msg(
f" Please add query after CMD!\nEx: <code>/{ctx.command[0]} Jurassic World</code>", f" Please add query after CMD!\nEx: <code>/{ctx.command[0]} Jurassic World</code>",
del_in=5, del_in=7,
) )
if ctx.sender_chat: if ctx.sender_chat:
return await ctx.reply_msg("This feature not supported for channel..", del_in=5) return await ctx.reply_msg(
"Cannot identify user, please use in private chat.", del_in=7
)
kuery = ctx.text.split(" ", 1)[1] kuery = ctx.text.split(" ", 1)[1]
is_imdb, lang = await is_imdbset(ctx.from_user.id) is_imdb, lang = await is_imdbset(ctx.from_user.id)
if is_imdb: if is_imdb:
@ -80,7 +84,9 @@ async def imdb_choose(self: Client, ctx: Message):
except ListenerTimeout: except ListenerTimeout:
del LIST_CARI[ranval] del LIST_CARI[ranval]
try: try:
await msg.edit_caption("😶‍🌫️ Callback Query Timeout. Task Has Been Canceled!") await msg.edit_caption(
"😶‍🌫️ Callback Query Timeout. Task Has Been Canceled!"
)
except MessageIdInvalid: except MessageIdInvalid:
pass pass
@ -99,14 +105,20 @@ async def imdblangset(self: Client, query: CallbackQuery):
) )
is_imdb, lang = await is_imdbset(query.from_user.id) is_imdb, lang = await is_imdbset(query.from_user.id)
if is_imdb: if is_imdb:
buttons.row(InlineButton("🗑 Remove UserSetting", f"setimdb#rm#{query.from_user.id}")) buttons.row(
InlineButton("🗑 Remove UserSetting", f"setimdb#rm#{query.from_user.id}")
)
buttons.row(InlineButton("❌ Close", f"close#{query.from_user.id}")) buttons.row(InlineButton("❌ Close", f"close#{query.from_user.id}"))
msg = await query.message.edit_caption("<i>Please select available language below..</i>", reply_markup=buttons) msg = await query.message.edit_caption(
"<i>Please select available language below..</i>", reply_markup=buttons
)
try: try:
await msg.wait_for_click(from_user_id=int(uid), timeout=30) await msg.wait_for_click(from_user_id=int(uid), timeout=30)
except ListenerTimeout: except ListenerTimeout:
try: try:
await msg.edit_caption("😶‍🌫️ Callback Query Timeout. Task Has Been Canceled!") await msg.edit_caption(
"😶‍🌫️ Callback Query Timeout. Task Has Been Canceled!"
)
except MessageIdInvalid: except MessageIdInvalid:
pass pass
@ -123,13 +135,19 @@ async def imdbsetlang(self: Client, query: CallbackQuery):
return await query.answer(f"⚠️ Your Setting Already in ({langset})!", True) return await query.answer(f"⚠️ Your Setting Already in ({langset})!", True)
if lang == "eng": if lang == "eng":
await add_imdbset(query.from_user.id, lang) await add_imdbset(query.from_user.id, lang)
await query.message.edit_caption("Language interface for IMDB has been changed to English.") await query.message.edit_caption(
"Language interface for IMDB has been changed to English."
)
elif lang == "ind": elif lang == "ind":
await add_imdbset(query.from_user.id, lang) await add_imdbset(query.from_user.id, lang)
await query.message.edit_caption("Bahasa tampilan IMDB sudah diubah ke Indonesia.") await query.message.edit_caption(
"Bahasa tampilan IMDB sudah diubah ke Indonesia."
)
else: else:
await remove_imdbset(query.from_user.id) await remove_imdbset(query.from_user.id)
await query.message.edit_caption("UserSetting for IMDB has been deleted from database.") await query.message.edit_caption(
"UserSetting for IMDB has been deleted from database."
)
async def imdb_search_id(kueri, message): async def imdb_search_id(kueri, message):
@ -148,7 +166,9 @@ async def imdb_search_id(kueri, message):
) )
res = r.json().get("d") res = r.json().get("d")
if not res: if not res:
return await k.edit_caption(f"⛔️ Tidak ditemukan hasil untuk kueri: <code>{kueri}</code>") return await k.edit_caption(
f"⛔️ Tidak ditemukan hasil untuk kueri: <code>{kueri}</code>"
)
msg += f"🎬 Ditemukan ({len(res)}) hasil untuk kueri: <code>{kueri}</code>\n\n" msg += f"🎬 Ditemukan ({len(res)}) hasil untuk kueri: <code>{kueri}</code>\n\n"
for num, movie in enumerate(res, start=1): for num, movie in enumerate(res, start=1):
title = movie.get("l") title = movie.get("l")
@ -189,7 +209,9 @@ async def imdb_search_id(kueri, message):
except MessageIdInvalid: except MessageIdInvalid:
pass pass
except Exception as err: except Exception as err:
await k.edit_caption(f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\n<b>ERROR:</b> <code>{err}</code>") await k.edit_caption(
f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\n<b>ERROR:</b> <code>{err}</code>"
)
async def imdb_search_en(kueri, message): async def imdb_search_en(kueri, message):
@ -208,7 +230,9 @@ async def imdb_search_en(kueri, message):
) )
res = r.json().get("d") res = r.json().get("d")
if not res: if not res:
return await k.edit_caption(f"⛔️ Result not found for keywords: <code>{kueri}</code>") return await k.edit_caption(
f"⛔️ Result not found for keywords: <code>{kueri}</code>"
)
msg += f"🎬 Found ({len(res)}) result for keywords: <code>{kueri}</code>\n\n" msg += f"🎬 Found ({len(res)}) result for keywords: <code>{kueri}</code>\n\n"
for num, movie in enumerate(res, start=1): for num, movie in enumerate(res, start=1):
title = movie.get("l") title = movie.get("l")
@ -249,7 +273,9 @@ async def imdb_search_en(kueri, message):
except MessageIdInvalid: except MessageIdInvalid:
pass pass
except Exception as err: except Exception as err:
await k.edit_caption(f"Failed when requesting movies title. Maybe got rate limit or down.\n\n<b>ERROR:</b> <code>{err}</code>") await k.edit_caption(
f"Failed when requesting movies title. Maybe got rate limit or down.\n\n<b>ERROR:</b> <code>{err}</code>"
)
@app.on_callback_query(filters.regex("^imdbcari")) @app.on_callback_query(filters.regex("^imdbcari"))
@ -276,7 +302,9 @@ async def imdbcari(self: Client, query: CallbackQuery):
) )
res = r.json().get("d") res = r.json().get("d")
if not res: if not res:
return await query.message.edit_caption(f"⛔️ Tidak ditemukan hasil untuk kueri: <code>{kueri}</code>") return await query.message.edit_caption(
f"⛔️ Tidak ditemukan hasil untuk kueri: <code>{kueri}</code>"
)
msg += f"🎬 Ditemukan ({len(res)}) hasil dari: <code>{kueri}</code> ~ {query.from_user.mention}\n\n" msg += f"🎬 Ditemukan ({len(res)}) hasil dari: <code>{kueri}</code> ~ {query.from_user.mention}\n\n"
for num, movie in enumerate(res, start=1): for num, movie in enumerate(res, start=1):
title = movie.get("l") title = movie.get("l")
@ -289,10 +317,16 @@ async def imdbcari(self: Client, query: CallbackQuery):
typee = movie.get("q", "N/A").replace("feature", "movie").title() typee = movie.get("q", "N/A").replace("feature", "movie").title()
movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] movieID = re.findall(r"tt(\d+)", movie.get("id"))[0]
msg += f"{num}. {title} {year} - {typee}\n" msg += f"{num}. {title} {year} - {typee}\n"
BTN.append(InlineKeyboardButton(text=num, callback_data=f"imdbres_id#{uid}#{movieID}")) BTN.append(
InlineKeyboardButton(
text=num, callback_data=f"imdbres_id#{uid}#{movieID}"
)
)
BTN.extend( BTN.extend(
( (
InlineKeyboardButton(text="🚩 Language", callback_data=f"imdbset#{uid}"), InlineKeyboardButton(
text="🚩 Language", callback_data=f"imdbset#{uid}"
),
InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"),
) )
) )
@ -306,7 +340,9 @@ async def imdbcari(self: Client, query: CallbackQuery):
except MessageIdInvalid: except MessageIdInvalid:
pass pass
except Exception as err: except Exception as err:
await query.message.edit_caption(f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\n<b>ERROR:</b> <code>{err}</code>") await query.message.edit_caption(
f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\n<b>ERROR:</b> <code>{err}</code>"
)
else: else:
if query.from_user.id != int(uid): if query.from_user.id != int(uid):
return await query.answer("⚠️ Access Denied!", True) return await query.answer("⚠️ Access Denied!", True)
@ -325,7 +361,9 @@ async def imdbcari(self: Client, query: CallbackQuery):
) )
res = r.json().get("d") res = r.json().get("d")
if not res: if not res:
return await query.message.edit_caption(f"⛔️ Result not found for keywords: <code>{kueri}</code>") return await query.message.edit_caption(
f"⛔️ Result not found for keywords: <code>{kueri}</code>"
)
msg += f"🎬 Found ({len(res)}) result for keywords: <code>{kueri}</code> ~ {query.from_user.mention}\n\n" msg += f"🎬 Found ({len(res)}) result for keywords: <code>{kueri}</code> ~ {query.from_user.mention}\n\n"
for num, movie in enumerate(res, start=1): for num, movie in enumerate(res, start=1):
title = movie.get("l") title = movie.get("l")
@ -338,10 +376,16 @@ async def imdbcari(self: Client, query: CallbackQuery):
typee = movie.get("q", "N/A").replace("feature", "movie").title() typee = movie.get("q", "N/A").replace("feature", "movie").title()
movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] movieID = re.findall(r"tt(\d+)", movie.get("id"))[0]
msg += f"{num}. {title} {year} - {typee}\n" msg += f"{num}. {title} {year} - {typee}\n"
BTN.append(InlineKeyboardButton(text=num, callback_data=f"imdbres_en#{uid}#{movieID}")) BTN.append(
InlineKeyboardButton(
text=num, callback_data=f"imdbres_en#{uid}#{movieID}"
)
)
BTN.extend( BTN.extend(
( (
InlineKeyboardButton(text="🚩 Language", callback_data=f"imdbset#{uid}"), InlineKeyboardButton(
text="🚩 Language", callback_data=f"imdbset#{uid}"
),
InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"),
) )
) )
@ -355,7 +399,9 @@ async def imdbcari(self: Client, query: CallbackQuery):
except MessageIdInvalid: except MessageIdInvalid:
pass pass
except Exception as err: except Exception as err:
await query.message.edit_caption(f"Failed when requesting movies title. Maybe got rate limit or down.\n\n<b>ERROR:</b> <code>{err}</code>") await query.message.edit_caption(
f"Failed when requesting movies title. Maybe got rate limit or down.\n\n<b>ERROR:</b> <code>{err}</code>"
)
@app.on_callback_query(filters.regex("^imdbres_id")) @app.on_callback_query(filters.regex("^imdbres_id"))
@ -370,78 +416,100 @@ async def imdb_id_callback(self: Client, query: CallbackQuery):
imdb_url = f"https://www.imdb.com/title/tt{movie}/" imdb_url = f"https://www.imdb.com/title/tt{movie}/"
resp = await http.get(imdb_url, headers=headers) resp = await http.get(imdb_url, headers=headers)
sop = BeautifulSoup(resp, "lxml") sop = BeautifulSoup(resp, "lxml")
r_json = json.loads(sop.find("script", attrs={"type": "application/ld+json"}).contents[0]) r_json = json.loads(
sop.find("script", attrs={"type": "application/ld+json"}).contents[0]
)
ott = await search_jw(r_json.get("name"), "ID") ott = await search_jw(r_json.get("name"), "ID")
typee = r_json.get("@type", "") typee = r_json.get("@type", "")
res_str = "" res_str = ""
tahun = re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) else "N/A" tahun = (
re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0]
if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)
else "N/A"
)
res_str += f"<b>📹 Judul:</b> <a href='{imdb_url}'>{r_json.get('name')} [{tahun}]</a> (<code>{typee}</code>)\n" res_str += f"<b>📹 Judul:</b> <a href='{imdb_url}'>{r_json.get('name')} [{tahun}]</a> (<code>{typee}</code>)\n"
if aka := r_json.get("alternateName"): if aka := r_json.get("alternateName"):
res_str += f"<b>📢 AKA:</b> <code>{aka}</code>\n\n" res_str += f"<b>📢 AKA:</b> <code>{aka}</code>\n\n"
else: else:
res_str += "\n" res_str += "\n"
if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): if durasi := sop.select('li[data-testid="title-techspec_runtime"]'):
durasi = durasi[0].find(class_="ipc-metadata-list-item__content-container").text durasi = (
durasi[0].find(class_="ipc-metadata-list-item__content-container").text
)
res_str += f"<b>Durasi:</b> <code>{GoogleTranslator('auto', 'id').translate(durasi)}</code>\n" res_str += f"<b>Durasi:</b> <code>{GoogleTranslator('auto', 'id').translate(durasi)}</code>\n"
if kategori := r_json.get("contentRating"): if kategori := r_json.get("contentRating"):
res_str += f"<b>Kategori:</b> <code>{kategori}</code> \n" res_str += f"<b>Kategori:</b> <code>{kategori}</code> \n"
if rating := r_json.get("aggregateRating"): if rating := r_json.get("aggregateRating"):
res_str += f"<b>Peringkat:</b> <code>{rating['ratingValue']}⭐️ dari {rating['ratingCount']} pengguna</code>\n" res_str += f"<b>Peringkat:</b> <code>{rating['ratingValue']}⭐️ dari {rating['ratingCount']} pengguna</code>\n"
if release := sop.select('li[data-testid="title-details-releasedate"]'): if release := sop.select('li[data-testid="title-details-releasedate"]'):
rilis = release[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link").text rilis = (
rilis_url = release[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")["href"] release[0]
res_str += f"<b>Rilis:</b> <a href='https://www.imdb.com{rilis_url}'>{rilis}</a>\n" .find(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
.text
)
rilis_url = release[0].find(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)["href"]
res_str += (
f"<b>Rilis:</b> <a href='https://www.imdb.com{rilis_url}'>{rilis}</a>\n"
)
if genre := r_json.get("genre"): if genre := r_json.get("genre"):
genre = "".join(f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " if i in GENRES_EMOJI else f"#{i.replace('-', '_').replace(' ', '_')}, " for i in r_json["genre"]) genre = "".join(
genre = genre[:-2] f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, "
res_str += f"<b>Genre:</b> {genre}\n" if i in GENRES_EMOJI
else f"#{i.replace('-', '_').replace(' ', '_')}, "
for i in r_json["genre"]
)
res_str += f"<b>Genre:</b> {genre[:-2]}\n"
if negara := sop.select('li[data-testid="title-details-origin"]'): if negara := sop.select('li[data-testid="title-details-origin"]'):
country = "".join(f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " for country in negara[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")) country = "".join(
country = country[:-2] f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, "
res_str += f"<b>Negara:</b> {country}\n" for country in negara[0].findAll(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
)
res_str += f"<b>Negara:</b> {country[:-2]}\n"
if bahasa := sop.select('li[data-testid="title-details-languages"]'): if bahasa := sop.select('li[data-testid="title-details-languages"]'):
language = "".join(f"#{lang.text.replace(' ', '_').replace('-', '_')}, " for lang in bahasa[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")) language = "".join(
language = language[:-2] f"#{lang.text.replace(' ', '_').replace('-', '_')}, "
res_str += f"<b>Bahasa:</b> {language}\n" for lang in bahasa[0].findAll(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
)
res_str += f"<b>Bahasa:</b> {language[:-2]}\n"
res_str += "\n<b>🙎 Info Cast:</b>\n" res_str += "\n<b>🙎 Info Cast:</b>\n"
if directors := r_json.get("director"): if directors := r_json.get("director"):
director = "" director = director = "".join(
for i in directors: f"<a href='{i['url']}'>{i['name']}</a>, " for i in r_json["director"]
name = i["name"] )
url = i["url"] res_str += f"<b>Sutradara:</b> {director[:-2]}\n"
director += f"<a href='{url}'>{name}</a>, "
director = director[:-2]
res_str += f"<b>Sutradara:</b> {director}\n"
if creators := r_json.get("creator"): if creators := r_json.get("creator"):
creator = "" creator = "".join(
for i in creators: f"<a href='{i['url']}'>{i['name']}</a>, "
if i["@type"] == "Person": for i in r_json["creator"]
name = i["name"] if i["@type"] == "Person"
url = i["url"] )
creator += f"<a href='{url}'>{name}</a>, " res_str += f"<b>Penulis:</b> {creator[:-2]}\n"
creator = creator[:-2]
res_str += f"<b>Penulis:</b> {creator}\n"
if actor := r_json.get("actor"): if actor := r_json.get("actor"):
actors = "" actors = "".join(
for i in actor: f"<a href='{i['url']}'>{i['name']}</a>, " for i in r_json["actor"]
name = i["name"] )
url = i["url"] res_str += f"<b>Pemeran:</b> {actors[:-2]}\n\n"
actors += f"<a href='{url}'>{name}</a>, "
actors = actors[:-2]
res_str += f"<b>Pemeran:</b> {actors}\n\n"
if deskripsi := r_json.get("description"): if deskripsi := r_json.get("description"):
summary = GoogleTranslator("auto", "id").translate(deskripsi) summary = GoogleTranslator("auto", "id").translate(deskripsi)
res_str += f"<b>📜 Plot: </b> <code>{summary}</code>\n\n" res_str += f"<b>📜 Plot: </b> <code>{summary}</code>\n\n"
if keywd := r_json.get("keywords"): if keywd := r_json.get("keywords"):
keywords = keywd.split(",") key_ = "".join(
key_ = "" f"#{i.replace(' ', '_').replace('-', '_')}, "
for i in keywords: for i in r_json["keywords"].split(",")
i = i.replace(" ", "_").replace("-", "_") )
key_ += f"#{i}, " res_str += f"<b>🔥 Kata Kunci:</b> {key_[:-2]} \n"
key_ = key_[:-2]
res_str += f"<b>🔥 Kata Kunci:</b> {key_} \n"
if award := sop.select('li[data-testid="award_information"]'): if award := sop.select('li[data-testid="award_information"]'):
awards = award[0].find(class_="ipc-metadata-list-item__list-content-item").text awards = (
award[0].find(class_="ipc-metadata-list-item__list-content-item").text
)
res_str += f"<b>🏆 Penghargaan:</b> <code>{GoogleTranslator('auto', 'id').translate(awards)}</code>\n" res_str += f"<b>🏆 Penghargaan:</b> <code>{GoogleTranslator('auto', 'id').translate(awards)}</code>\n"
else: else:
res_str += "\n" res_str += "\n"
@ -459,25 +527,37 @@ async def imdb_id_callback(self: Client, query: CallbackQuery):
] ]
) )
else: else:
markup = InlineKeyboardMarkup([[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]]) markup = InlineKeyboardMarkup(
[[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]]
)
if thumb := r_json.get("image"): if thumb := r_json.get("image"):
try: try:
await query.message.edit_media( await query.message.edit_media(
InputMediaPhoto(thumb, caption=res_str, parse_mode=enums.ParseMode.HTML), InputMediaPhoto(
thumb, caption=res_str, parse_mode=enums.ParseMode.HTML
),
reply_markup=markup, reply_markup=markup,
) )
except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty): except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty):
poster = thumb.replace(".jpg", "._V1_UX360.jpg") poster = thumb.replace(".jpg", "._V1_UX360.jpg")
await query.message.edit_media( await query.message.edit_media(
InputMediaPhoto(poster, caption=res_str, parse_mode=enums.ParseMode.HTML), InputMediaPhoto(
poster, caption=res_str, parse_mode=enums.ParseMode.HTML
),
reply_markup=markup, reply_markup=markup,
) )
except MediaCaptionTooLong: except MediaCaptionTooLong:
await query.message.reply(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.reply(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
except Exception: except Exception:
await query.message.edit_caption(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.edit_caption(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
else: else:
await query.message.edit_caption(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.edit_caption(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
except (MessageNotModified, MessageIdInvalid): except (MessageNotModified, MessageIdInvalid):
pass pass
@ -494,77 +574,99 @@ async def imdb_en_callback(self: Client, query: CallbackQuery):
imdb_url = f"https://www.imdb.com/title/tt{movie}/" imdb_url = f"https://www.imdb.com/title/tt{movie}/"
resp = await http.get(imdb_url, headers=headers) resp = await http.get(imdb_url, headers=headers)
sop = BeautifulSoup(resp, "lxml") sop = BeautifulSoup(resp, "lxml")
r_json = json.loads(sop.find("script", attrs={"type": "application/ld+json"}).contents[0]) r_json = json.loads(
sop.find("script", attrs={"type": "application/ld+json"}).contents[0]
)
ott = await search_jw(r_json.get("name"), "US") ott = await search_jw(r_json.get("name"), "US")
typee = r_json.get("@type", "") typee = r_json.get("@type", "")
res_str = "" res_str = ""
tahun = re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) else "N/A" tahun = (
re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0]
if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)
else "N/A"
)
res_str += f"<b>📹 Judul:</b> <a href='{imdb_url}'>{r_json.get('name')} [{tahun}]</a> (<code>{typee}</code>)\n" res_str += f"<b>📹 Judul:</b> <a href='{imdb_url}'>{r_json.get('name')} [{tahun}]</a> (<code>{typee}</code>)\n"
if aka := r_json.get("alternateName"): if aka := r_json.get("alternateName"):
res_str += f"<b>📢 AKA:</b> <code>{aka}</code>\n\n" res_str += f"<b>📢 AKA:</b> <code>{aka}</code>\n\n"
else: else:
res_str += "\n" res_str += "\n"
if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): if durasi := sop.select('li[data-testid="title-techspec_runtime"]'):
durasi = durasi[0].find(class_="ipc-metadata-list-item__content-container").text durasi = (
durasi[0].find(class_="ipc-metadata-list-item__content-container").text
)
res_str += f"<b>Duration:</b> <code>{durasi}</code>\n" res_str += f"<b>Duration:</b> <code>{durasi}</code>\n"
if kategori := r_json.get("contentRating"): if kategori := r_json.get("contentRating"):
res_str += f"<b>Category:</b> <code>{kategori}</code> \n" res_str += f"<b>Category:</b> <code>{kategori}</code> \n"
if rating := r_json.get("aggregateRating"): if rating := r_json.get("aggregateRating"):
res_str += f"<b>Rating:</b> <code>{rating['ratingValue']}⭐️ from {rating['ratingCount']} users</code>\n" res_str += f"<b>Rating:</b> <code>{rating['ratingValue']}⭐️ from {rating['ratingCount']} users</code>\n"
if release := sop.select('li[data-testid="title-details-releasedate"]'): if release := sop.select('li[data-testid="title-details-releasedate"]'):
rilis = release[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link").text rilis = (
rilis_url = release[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")["href"] release[0]
res_str += f"<b>Rilis:</b> <a href='https://www.imdb.com{rilis_url}'>{rilis}</a>\n" .find(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
.text
)
rilis_url = release[0].find(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)["href"]
res_str += (
f"<b>Rilis:</b> <a href='https://www.imdb.com{rilis_url}'>{rilis}</a>\n"
)
if genre := r_json.get("genre"): if genre := r_json.get("genre"):
genre = "".join(f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " if i in GENRES_EMOJI else f"#{i.replace('-', '_').replace(' ', '_')}, " for i in r_json["genre"]) genre = "".join(
genre = genre[:-2] f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, "
res_str += f"<b>Genre:</b> {genre}\n" if i in GENRES_EMOJI
else f"#{i.replace('-', '_').replace(' ', '_')}, "
for i in r_json["genre"]
)
res_str += f"<b>Genre:</b> {genre[:-2]}\n"
if negara := sop.select('li[data-testid="title-details-origin"]'): if negara := sop.select('li[data-testid="title-details-origin"]'):
country = "".join(f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " for country in negara[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")) country = "".join(
country = country[:-2] f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, "
res_str += f"<b>Country:</b> {country}\n" for country in negara[0].findAll(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
)
res_str += f"<b>Country:</b> {country[:-2]}\n"
if bahasa := sop.select('li[data-testid="title-details-languages"]'): if bahasa := sop.select('li[data-testid="title-details-languages"]'):
language = "".join(f"#{lang.text.replace(' ', '_').replace('-', '_')}, " for lang in bahasa[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")) language = "".join(
language = language[:-2] f"#{lang.text.replace(' ', '_').replace('-', '_')}, "
res_str += f"<b>Language:</b> {language}\n" for lang in bahasa[0].findAll(
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
)
)
res_str += f"<b>Language:</b> {language[:-2]}\n"
res_str += "\n<b>🙎 Cast Info:</b>\n" res_str += "\n<b>🙎 Cast Info:</b>\n"
if directors := r_json.get("director"): if r_json.get("director"):
director = "" director = "".join(
for i in directors: f"<a href='{i['url']}'>{i['name']}</a>, " for i in r_json["director"]
name = i["name"] )
url = i["url"] res_str += f"<b>Director:</b> {director[:-2]}\n"
director += f"<a href='{url}'>{name}</a>, " if r_json.get("creator"):
director = director[:-2] creator = "".join(
res_str += f"<b>Director:</b> {director}\n" f"<a href='{i['url']}'>{i['name']}</a>, "
if creators := r_json.get("creator"): for i in r_json["creator"]
creator = "" if i["@type"] == "Person"
for i in creators: )
if i["@type"] == "Person": res_str += f"<b>Writer:</b> {creator[-2]}\n"
name = i["name"] if r_json.get("actor"):
url = i["url"] actors = actors = "".join(
creator += f"<a href='{url}'>{name}</a>, " f"<a href='{i['url']}'>{i['name']}</a>, " for i in r_json["actor"]
creator = creator[:-2] )
res_str += f"<b>Writer:</b> {creator}\n" res_str += f"<b>Stars:</b> {actors[:-2]}\n\n"
if actor := r_json.get("actor"):
actors = ""
for i in actor:
name = i["name"]
url = i["url"]
actors += f"<a href='{url}'>{name}</a>, "
actors = actors[:-2]
res_str += f"<b>Stars:</b> {actors}\n\n"
if description := r_json.get("description"): if description := r_json.get("description"):
res_str += f"<b>📜 Summary: </b> <code>{description}</code>\n\n" res_str += f"<b>📜 Summary: </b> <code>{description}</code>\n\n"
if keywd := r_json.get("keywords"): if r_json.get("keywords"):
keywords = keywd.split(",") key_ = "".join(
key_ = "" f"#{i.replace(' ', '_').replace('-', '_')}, "
for i in keywords: for i in r_json["keywords"].split(",")
i = i.replace(" ", "_").replace("-", "_") )
key_ += f"#{i}, " res_str += f"<b>🔥 Keywords:</b> {key_[:-2]} \n"
key_ = key_[:-2]
res_str += f"<b>🔥 Keywords:</b> {key_} \n"
if award := sop.select('li[data-testid="award_information"]'): if award := sop.select('li[data-testid="award_information"]'):
awards = award[0].find(class_="ipc-metadata-list-item__list-content-item").text awards = (
award[0].find(class_="ipc-metadata-list-item__list-content-item").text
)
res_str += f"<b>🏆 Awards:</b> <code>{awards}</code>\n" res_str += f"<b>🏆 Awards:</b> <code>{awards}</code>\n"
else: else:
res_str += "\n" res_str += "\n"
@ -582,24 +684,36 @@ async def imdb_en_callback(self: Client, query: CallbackQuery):
] ]
) )
else: else:
markup = InlineKeyboardMarkup([[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]]) markup = InlineKeyboardMarkup(
[[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]]
)
if thumb := r_json.get("image"): if thumb := r_json.get("image"):
try: try:
await query.message.edit_media( await query.message.edit_media(
InputMediaPhoto(thumb, caption=res_str, parse_mode=enums.ParseMode.HTML), InputMediaPhoto(
thumb, caption=res_str, parse_mode=enums.ParseMode.HTML
),
reply_markup=markup, reply_markup=markup,
) )
except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty): except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty):
poster = thumb.replace(".jpg", "._V1_UX360.jpg") poster = thumb.replace(".jpg", "._V1_UX360.jpg")
await query.message.edit_media( await query.message.edit_media(
InputMediaPhoto(poster, caption=res_str, parse_mode=enums.ParseMode.HTML), InputMediaPhoto(
poster, caption=res_str, parse_mode=enums.ParseMode.HTML
),
reply_markup=markup, reply_markup=markup,
) )
except MediaCaptionTooLong: except MediaCaptionTooLong:
await query.message.reply(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.reply(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
except Exception: except Exception:
await query.message.edit_caption(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.edit_caption(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
else: else:
await query.message.edit_caption(res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup) await query.message.edit_caption(
res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup
)
except (MessageNotModified, MessageIdInvalid): except (MessageNotModified, MessageIdInvalid):
pass pass

View file

@ -659,8 +659,7 @@ async def imdb_inl(_, query):
else f"#{i.replace('-', '_').replace(' ', '_')}, " else f"#{i.replace('-', '_').replace(' ', '_')}, "
for i in r_json["genre"] for i in r_json["genre"]
) )
genre = genre[:-2] res_str += f"<b>Genre:</b> {genre[:-2]}\n"
res_str += f"<b>Genre:</b> {genre}\n"
if negara := sop.select('li[data-testid="title-details-origin"]'): if negara := sop.select('li[data-testid="title-details-origin"]'):
country = "".join( country = "".join(
f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, "
@ -668,8 +667,7 @@ async def imdb_inl(_, query):
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
) )
) )
country = country[:-2] res_str += f"<b>Negara:</b> {country[:-2]}\n"
res_str += f"<b>Negara:</b> {country}\n"
if bahasa := sop.select('li[data-testid="title-details-languages"]'): if bahasa := sop.select('li[data-testid="title-details-languages"]'):
language = "".join( language = "".join(
f"#{lang.text.replace(' ', '_').replace('-', '_')}, " f"#{lang.text.replace(' ', '_').replace('-', '_')}, "
@ -677,47 +675,37 @@ async def imdb_inl(_, query):
class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link"
) )
) )
language = language[:-2] res_str += f"<b>Bahasa:</b> {language[:-2]}\n"
res_str += f"<b>Bahasa:</b> {language}\n"
res_str += "\n<b>🙎 Info Cast:</b>\n" res_str += "\n<b>🙎 Info Cast:</b>\n"
if r_json.get("director"): if r_json.get("director"):
director = "" director = "".join(
for i in r_json["director"]: f"<a href='{i['url']}'>{i['name']}</a>, "
name = i["name"] for i in r_json["director"]
url = i["url"] )
director += f"<a href='{url}'>{name}</a>, " res_str += f"<b>Sutradara:</b> {director[:-2]}\n"
director = director[:-2]
res_str += f"<b>Sutradara:</b> {director}\n"
if r_json.get("creator"): if r_json.get("creator"):
creator = "" creator = "".join(
for i in r_json["creator"]: f"<a href='{i['url']}'>{i['name']}</a>, "
if i["@type"] == "Person": for i in r_json["creator"]
name = i["name"] if i["@type"] == "Person"
url = i["url"] )
creator += f"<a href='{url}'>{name}</a>, " res_str += f"<b>Penulis:</b> {creator[:-2]}\n"
creator = creator[:-2]
res_str += f"<b>Penulis:</b> {creator}\n"
if r_json.get("actor"): if r_json.get("actor"):
actors = "" actors = "".join(
for i in r_json["actor"]: f"<a href='{i['url']}'>{i['name']}</a>, " for i in r_json["actor"]
name = i["name"] )
url = i["url"] res_str += f"<b>Pemeran:</b> {actors[:-2]}\n\n"
actors += f"<a href='{url}'>{name}</a>, "
actors = actors[:-2]
res_str += f"<b>Pemeran:</b> {actors}\n\n"
if r_json.get("description"): if r_json.get("description"):
summary = GoogleTranslator("auto", "id").translate( summary = GoogleTranslator("auto", "id").translate(
r_json.get("description") r_json.get("description")
) )
res_str += f"<b>📜 Plot: </b> <code>{summary}</code>\n\n" res_str += f"<b>📜 Plot: </b> <code>{summary}</code>\n\n"
if r_json.get("keywords"): if r_json.get("keywords"):
keywords = r_json["keywords"].split(",") key_ = "".join(
key_ = "" f"#{i.replace(' ', '_').replace('-', '_')}, "
for i in keywords: for i in r_json["keywords"].split(",")
i = i.replace(" ", "_").replace("-", "_") )
key_ += f"#{i}, " res_str += f"<b>🔥 Kata Kunci:</b> {key_[:-2]} \n"
key_ = key_[:-2]
res_str += f"<b>🔥 Kata Kunci:</b> {key_} \n"
if award := sop.select('li[data-testid="award_information"]'): if award := sop.select('li[data-testid="award_information"]'):
awards = ( awards = (
award[0] award[0]

View file

@ -59,7 +59,11 @@ async def chlang(c: Client, m: Union[CallbackQuery, Message], strings):
keyboard = InlineKeyboardMarkup( keyboard = InlineKeyboardMarkup(
inline_keyboard=[ inline_keyboard=[
*gen_langs_kb(), *gen_langs_kb(),
[InlineKeyboardButton(strings("back_btn", context="general"), callback_data="start_back")], [
InlineKeyboardButton(
strings("back_btn", context="general"), callback_data="start_back"
)
],
] ]
) )
@ -69,8 +73,14 @@ async def chlang(c: Client, m: Union[CallbackQuery, Message], strings):
else: else:
msg = m msg = m
sender = msg.reply_text sender = msg.reply_text
if not msg.from_user:
return
res = strings("language_changer_private") if msg.chat.type == ChatType.PRIVATE else strings("language_changer_chat") res = (
strings("language_changer_private")
if msg.chat.type == ChatType.PRIVATE
else strings("language_changer_chat")
)
msg = await sender(res, reply_markup=keyboard) msg = await sender(res, reply_markup=keyboard)
try: try:
await msg.wait_for_click(from_user_id=m.from_user.id, timeout=30) await msg.wait_for_click(from_user_id=m.from_user.id, timeout=30)
@ -105,4 +115,6 @@ async def set_chat_lang(c: Client, m: CallbackQuery, strings):
) )
else: else:
keyboard = None keyboard = None
await m.message.edit_text(strings("language_changed_successfully"), reply_markup=keyboard) await m.message.edit_msg(
strings("language_changed_successfully"), reply_markup=keyboard
)

View file

@ -220,11 +220,15 @@ async def getDataKuso(msg, kueri, CurrentPage, user, strings):
kusoResult += f"<b>{index*6+c}</b>. {i['title']}\n{i['link']}\n\n" kusoResult += f"<b>{index*6+c}</b>. {i['title']}\n{i['link']}\n\n"
if c < 6: if c < 6:
extractbtn1.append( extractbtn1.append(
InlineButton(c, f"kusoextract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"kusoextract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
else: else:
extractbtn2.append( extractbtn2.append(
InlineButton(c, f"kusoextract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"kusoextract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
kusoResult = "".join(i for i in kusoResult if i not in "[]") kusoResult = "".join(i for i in kusoResult if i not in "[]")
return kusoResult, PageLen, extractbtn1, extractbtn2 return kusoResult, PageLen, extractbtn1, extractbtn2
@ -316,7 +320,9 @@ async def getDataSavefilm21(msg, kueri, CurrentPage, user, strings):
for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1): for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1):
sfResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>Genre:</b> {i['genre']}\n\n" sfResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>Genre:</b> {i['genre']}\n\n"
extractbtn.append( extractbtn.append(
InlineButton(c, f"sf21extract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"sf21extract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
sfResult = "".join(i for i in sfResult if i not in "[]") sfResult = "".join(i for i in sfResult if i not in "[]")
return sfResult, PageLen, extractbtn return sfResult, PageLen, extractbtn
@ -366,7 +372,9 @@ async def getDataLendrive(msg, kueri, CurrentPage, user, strings):
for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1): for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1):
lenddataResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>{strings('quality')}:</b> {i['quality']}\n<b>Status:</b> {i['status']}\n\n" lenddataResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>{strings('quality')}:</b> {i['quality']}\n<b>Status:</b> {i['status']}\n\n"
extractbtn.append( extractbtn.append(
InlineButton(c, f"lendriveextract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"lendriveextract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
lenddataResult = "".join(i for i in lenddataResult if i not in "[]") lenddataResult = "".join(i for i in lenddataResult if i not in "[]")
return lenddataResult, PageLen, extractbtn return lenddataResult, PageLen, extractbtn
@ -415,7 +423,9 @@ async def getDataMelong(msg, kueri, CurrentPage, user, strings):
for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1): for c, i in enumerate(SCRAP_DICT[msg.id][0][index], start=1):
melongResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>{strings('quality')}:</b> {i['quality']}\n\n" melongResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>{strings('quality')}:</b> {i['quality']}\n\n"
extractbtn.append( extractbtn.append(
InlineButton(c, f"melongextract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"melongextract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
melongResult = "".join(i for i in melongResult if i not in "[]") melongResult = "".join(i for i in melongResult if i not in "[]")
return melongResult, PageLen, extractbtn return melongResult, PageLen, extractbtn
@ -466,7 +476,9 @@ async def getDataGomov(msg, kueri, CurrentPage, user, strings):
gomovResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>Genre:</b> <code>{i['genre']}</code>\n\n" gomovResult += f"<b>{index*6+c}. <a href='{i['link']}'>{i['judul']}</a></b>\n<b>Genre:</b> <code>{i['genre']}</code>\n\n"
if not re.search(r"Series", i["genre"]): if not re.search(r"Series", i["genre"]):
extractbtn.append( extractbtn.append(
InlineButton(c, f"gomovextract#{CurrentPage}#{c}#{user}#{msg.id}") InlineButton(
index * 6 + c, f"gomovextract#{CurrentPage}#{c}#{user}#{msg.id}"
)
) )
gomovResult += strings("unsupport_dl_btn") gomovResult += strings("unsupport_dl_btn")
gomovResult = "".join(i for i in gomovResult if i not in "[]") gomovResult = "".join(i for i in gomovResult if i not in "[]")

View file

@ -108,7 +108,10 @@ async def ytdownv2(self: Client, ctx: Message, strings):
) )
await msg.wait_for_click(from_user_id=ctx.from_user.id, timeout=30) await msg.wait_for_click(from_user_id=ctx.from_user.id, timeout=30)
except ListenerTimeout: except ListenerTimeout:
await msg.edit_caption(strings("exp_task", context="general")) try:
await msg.edit_caption(strings("exp_task", context="general"))
except MessageIdInvalid:
pass
except Exception as err: except Exception as err:
await ctx.reply_msg(f"Opps, ERROR: {str(err)}") await ctx.reply_msg(f"Opps, ERROR: {str(err)}")