diff --git a/misskaty/core/decorator/errors.py b/misskaty/core/decorator/errors.py index 77c20b0a..4733aa76 100644 --- a/misskaty/core/decorator/errors.py +++ b/misskaty/core/decorator/errors.py @@ -56,7 +56,7 @@ def capture_err(func): try: await app.send_message(LOG_CHANNEL, x) await message.reply(x) - except FloodWait as e: + except FloodWait: await asyncio.sleep(x.value) raise err diff --git a/misskaty/plugins/admin.py b/misskaty/plugins/admin.py index ee00834e..7b8cd4ff 100644 --- a/misskaty/plugins/admin.py +++ b/misskaty/plugins/admin.py @@ -60,12 +60,7 @@ async def admin_cache_func(_, cmu): try: admins_in_chat[cmu.chat.id] = { "last_updated_at": time(), - "data": [ - member.user.id - async for member in app.get_chat_members( - cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS - ) - ], + "data": [member.user.id async for member in app.get_chat_members(cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)], } LOGGER.info(f"Updated admin cache for {cmu.chat.id} [{cmu.chat.title}]") except: @@ -155,9 +150,7 @@ async def kickFunc(client, message): # Ban/DBan/TBan User -@app.on_message( - filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & ~filters.private -) +@app.on_message(filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & ~filters.private) @adminsOnly("can_restrict_members") async def banFunc(client, message): user_id, reason = await extract_user_and_reason(message, sender_chat=True) @@ -169,23 +162,14 @@ async def banFunc(client, message): if user_id in SUDO: return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!") if user_id in (await list_admins(message.chat.id)): - return await message.reply_text( - "I can't ban an admin, You know the rules, so do i." - ) + return await message.reply_text("I can't ban an admin, You know the rules, so do i.") try: mention = (await app.get_users(user_id)).mention except IndexError: - mention = ( - message.reply_to_message.sender_chat.title - if message.reply_to_message - else "Anon" - ) + mention = message.reply_to_message.sender_chat.title if message.reply_to_message else "Anon" - msg = ( - f"**Banned User:** {mention}\n" - f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n" - ) + msg = f"**Banned User:** {mention}\n" f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n" if message.command[0][0] == "d": await message.reply_to_message.delete() if message.command[0] == "tban": @@ -233,33 +217,25 @@ async def unban_func(_, message): elif len(message.command) == 1 and reply: user = message.reply_to_message.from_user.id else: - return await message.reply_text( - "Provide a username or reply to a user's message to unban." - ) + return await message.reply_text("Provide a username or reply to a user's message to unban.") await message.chat.unban_member(user) umention = (await app.get_users(user)).mention await message.reply_text(f"Unbanned! {umention}") # Ban users listed in a message -@app.on_message( - filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & ~filters.private -) +@app.on_message(filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & ~filters.private) async def list_ban_(c, message): userid, msglink_reason = await extract_user_and_reason(message) if not userid or not msglink_reason: - return await message.reply_text( - "Provide a userid/username along with message link and reason to list-ban" - ) + return await message.reply_text("Provide a userid/username along with message link and reason to list-ban") if len(msglink_reason.split(" ")) == 1: # message link included with the reason return await message.reply_text("You must provide a reason to list-ban") # seperate messge link from reason lreason = msglink_reason.split() messagelink, reason = lreason[0], " ".join(lreason[1:]) - if not re.search( - r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink - ): # validate link + if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink): # validate link return await message.reply_text("Invalid message link provided") if userid == c.me.id: @@ -268,9 +244,7 @@ async def list_ban_(c, message): return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!") splitted = messagelink.split("/") uname, mid = splitted[-2], int(splitted[-1]) - m = await message.reply_text( - "`Banning User from multiple groups. This may take some time`" - ) + m = await message.reply_text("`Banning User from multiple groups. This may take some time`") try: msgtext = (await app.get_messages(uname, mid)).text gusernames = re.findall("@\w+", msgtext) @@ -299,17 +273,11 @@ async def list_ban_(c, message): # Unban users listed in a message -@app.on_message( - filters.user(SUDO) - & filters.command("listunban", COMMAND_HANDLER) - & ~filters.private -) +@app.on_message(filters.user(SUDO) & filters.command("listunban", COMMAND_HANDLER) & ~filters.private) async def list_unban_(c, message): userid, msglink = await extract_user_and_reason(message) if not userid or not msglink: - return await message.reply_text( - "Provide a userid/username along with message link to list-unban" - ) + return await message.reply_text("Provide a userid/username along with message link to list-unban") if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", msglink): # validate link return await message.reply_text("Invalid message link provided") @@ -359,9 +327,7 @@ async def deleteFunc(_, message): # Promote Members -@app.on_message( - filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & ~filters.private -) +@app.on_message(filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & ~filters.private) @adminsOnly("can_promote_members") async def promoteFunc(client, message): try: @@ -471,15 +437,10 @@ async def mute(client, message): if user_id in SUDO: return await message.reply_text("You wanna mute the elevated one?, RECONSIDER!") if user_id in (await list_admins(message.chat.id)): - return await message.reply_text( - "I can't mute an admin, You know the rules, so do i." - ) + return await message.reply_text("I can't mute an admin, You know the rules, so do i.") mention = (await app.get_users(user_id)).mention keyboard = ikb({"🚨 Unmute 🚨": f"unmute_{user_id}"}) - msg = ( - f"**Muted User:** {mention}\n" - f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n" - ) + msg = f"**Muted User:** {mention}\n" f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n" if message.command[0] == "tmute": split = reason.split(None, 1) time_value = split[0] @@ -555,9 +516,7 @@ async def warn_user(client, message): if user_id in SUDO: return await message.reply_text("You Wanna Warn The Elevated One?, RECONSIDER!") if user_id in (await list_admins(chat_id)): - return await message.reply_text( - "I can't warn an admin, You know the rules, so do i." - ) + return await message.reply_text("I can't warn an admin, You know the rules, so do i.") user, warns = await asyncio.gather( app.get_users(user_id), get_warn(chat_id, await int_to_alpha(user_id)), @@ -590,8 +549,7 @@ async def remove_warning(_, cq): permission = "can_restrict_members" if permission not in permissions: return await cq.answer( - "You don't have enough permissions to perform this action.\n" - + f"Permission needed: {permission}", + "You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}", show_alert=True, ) user_id = cq.data.split("_")[1] @@ -616,8 +574,7 @@ async def unmute_user(_, cq): permission = "can_restrict_members" if permission not in permissions: return await cq.answer( - "You don't have enough permissions to perform this action.\n" - + f"Permission needed: {permission}", + "You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}", show_alert=True, ) user_id = cq.data.split("_")[1] @@ -636,8 +593,7 @@ async def unban_user(_, cq): permission = "can_restrict_members" if permission not in permissions: return await cq.answer( - "You don't have enough permissions to perform this action.\n" - + f"Permission needed: {permission}", + "You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}", show_alert=True, ) user_id = cq.data.split("_")[1] @@ -654,9 +610,7 @@ async def unban_user(_, cq): @adminsOnly("can_restrict_members") async def remove_warnings(_, message): if not message.reply_to_message: - return await message.reply_text( - "Reply to a message to remove a user's warnings." - ) + return await message.reply_text("Reply to a message to remove a user's warnings.") user_id = message.reply_to_message.from_user.id mention = message.reply_to_message.from_user.mention chat_id = message.chat.id @@ -687,13 +641,7 @@ async def check_warns(_, message): # Report User in Group -@app.on_message( - ( - filters.command("report", COMMAND_HANDLER) - | filters.command(["admins", "admin"], prefixes="@") - ) - & ~filters.private -) +@app.on_message((filters.command("report", COMMAND_HANDLER) | filters.command(["admins", "admin"], prefixes="@")) & ~filters.private) @capture_err async def report_user(_, message): if not message.reply_to_message: @@ -708,28 +656,13 @@ async def report_user(_, message): linked_chat = (await app.get_chat(message.chat.id)).linked_chat if linked_chat is None: if reply_id in list_of_admins or reply_id == message.chat.id: - return await message.reply_text( - "Do you know that the user you are replying is an admin ?" - ) + return await message.reply_text("Do you know that the user you are replying is an admin ?") - elif ( - reply_id in list_of_admins - or reply_id == message.chat.id - or reply_id == linked_chat.id - ): - return await message.reply_text( - "Do you know that the user you are replying is an admin ?" - ) - user_mention = ( - reply.from_user.mention if reply.from_user else reply.sender_chat.title - ) + elif reply_id in list_of_admins or reply_id == message.chat.id or reply_id == linked_chat.id: + return await message.reply_text("Do you know that the user you are replying is an admin ?") + user_mention = reply.from_user.mention if reply.from_user else reply.sender_chat.title text = f"Reported {user_mention} to admins!" - admin_data = [ - m - async for m in app.get_chat_members( - message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS - ) - ] + admin_data = [m async for m in app.get_chat_members(message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)] for admin in admin_data: if admin.user.is_bot or admin.user.is_deleted: # return bots or deleted admins diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py index 9404b796..3c1309c1 100644 --- a/misskaty/plugins/dev.py +++ b/misskaty/plugins/dev.py @@ -50,9 +50,7 @@ async def donate(_, message): ) -@app.on_message( - filters.command(["balas"], COMMAND_HANDLER) & filters.user(SUDO) & filters.reply -) +@app.on_message(filters.command(["balas"], COMMAND_HANDLER) & filters.user(SUDO) & filters.reply) async def balas(c, m): pesan = m.text.split(" ", 1) await m.delete() @@ -66,9 +64,7 @@ async def neofetch(c, m): @app.on_message(filters.command(["shell", "sh"], COMMAND_HANDLER) & filters.user(SUDO)) -@app.on_edited_message( - filters.command(["shell", "sh"], COMMAND_HANDLER) & filters.user(SUDO) -) +@app.on_edited_message(filters.command(["shell", "sh"], COMMAND_HANDLER) & filters.user(SUDO)) async def shell(_, m): cmd = m.text.split(" ", 1) if len(cmd) == 1: @@ -81,15 +77,7 @@ async def shell(_, m): await m.reply_document( document=doc, file_name=doc.name, - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{m.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{m.from_user.id}")]]), ) try: os.remove("shell_output.txt") @@ -99,15 +87,7 @@ async def shell(_, m): await m.reply( shell, parse_mode=enums.ParseMode.HTML, - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{m.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{m.from_user.id}")]]), ) else: await m.reply("No Reply") @@ -157,15 +137,7 @@ async def evaluation_cmd_t(_, m): document="MissKatyEval.txt", caption=f"{cmd[: 4096 // 4 - 1]}", disable_notification=True, - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{m.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{m.from_user.id}")]]), ) os.remove("MissKatyEval.txt") await status_message.delete() @@ -173,32 +145,17 @@ async def evaluation_cmd_t(_, m): await status_message.edit( final_output, parse_mode=enums.ParseMode.MARKDOWN, - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{m.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{m.from_user.id}")]]), ) async def aexec(code, c, m): - exec( - "async def __aexec(c, m): " - + "\n p = print" - + "\n replied = m.reply_to_message" - + "".join(f"\n {l_}" for l_ in code.split("\n")) - ) + exec("async def __aexec(c, m): " + "\n p = print" + "\n replied = m.reply_to_message" + "".join(f"\n {l_}" for l_ in code.split("\n"))) return await locals()["__aexec"](c, m) async def shell_exec(code, treat=True): - process = await asyncio.create_subprocess_shell( - code, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT - ) + process = await asyncio.create_subprocess_shell(code, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) stdout = (await process.communicate())[0] if treat: diff --git a/misskaty/plugins/genss.py b/misskaty/plugins/genss.py index cfb85835..a8295a6a 100644 --- a/misskaty/plugins/genss.py +++ b/misskaty/plugins/genss.py @@ -40,9 +40,7 @@ async def genss(client, message): media = v break if media is None: - return await message.reply( - "Reply to a Telegram Video or document as video to generate screenshoot!" - ) + return await message.reply("Reply to a Telegram Video or document as video to generate screenshoot!") process = await message.reply_text("`Processing, please wait..`") c_time = time.time() the_real_download_location = await client.download_media( @@ -64,16 +62,12 @@ async def genss(client, message): chat_id=message.chat.id, message_id=process.id, ) - await client.send_chat_action( - chat_id=message.chat.id, action=enums.ChatAction.UPLOAD_PHOTO - ) + await client.send_chat_action(chat_id=message.chat.id, action=enums.ChatAction.UPLOAD_PHOTO) try: await gather( *[ - message.reply_document( - images, reply_to_message_id=message.id - ), + message.reply_document(images, reply_to_message_id=message.id), message.reply_photo(images, reply_to_message_id=message.id), ] ) @@ -81,9 +75,7 @@ async def genss(client, message): await sleep(e.value) await gather( *[ - message.reply_document( - images, reply_to_message_id=message.id - ), + message.reply_document(images, reply_to_message_id=message.id), message.reply_photo(images, reply_to_message_id=message.id), ] ) @@ -115,13 +107,9 @@ async def genss_link(client, message): try: link = message.text.split(" ")[1] if link.startswith("https://file.yasirweb.my.id"): - link = link.replace( - "https://file.yasirweb.my.id", "https://file.yasiraris.workers.dev" - ) + link = link.replace("https://file.yasirweb.my.id", "https://file.yasiraris.workers.dev") if link.startswith("https://link.yasirweb.my.id"): - link = link.replace( - "https://link.yasirweb.my.id", "https://yasirrobot.herokuapp.com" - ) + link = link.replace("https://link.yasirweb.my.id", "https://yasirrobot.herokuapp.com") process = await message.reply_text("`Processing, please wait..`") tmp_directory_for_each_user = f"./MissKaty_Genss/{str(message.from_user.id)}" if not os.path.isdir(tmp_directory_for_each_user): @@ -133,9 +121,7 @@ async def genss_link(client, message): chat_id=message.chat.id, message_id=process.id, ) - await client.send_chat_action( - chat_id=message.chat.id, action=enums.ChatAction.UPLOAD_PHOTO - ) + await client.send_chat_action(chat_id=message.chat.id, action=enums.ChatAction.UPLOAD_PHOTO) try: await message.reply_media_group(images, reply_to_message_id=message.id) except FloodWait as e: diff --git a/misskaty/plugins/grup_tools.py b/misskaty/plugins/grup_tools.py index 61648c88..b5dc71b5 100644 --- a/misskaty/plugins/grup_tools.py +++ b/misskaty/plugins/grup_tools.py @@ -45,9 +45,7 @@ def draw_multiple_line_text(image, text, font, text_start_height): lines = textwrap.wrap(text, width=50) for line in lines: line_width, line_height = font.getsize(line) - draw.text( - ((image_width - line_width) / 2, y_text), line, font=font, fill="black" - ) + draw.text(((image_width - line_width) / 2, y_text), line, font=font, fill="black") y_text += line_height @@ -57,12 +55,8 @@ def welcomepic(pic, user, chat, count, id): background = background.resize((1024, 500), Image.ANTIALIAS) pfp = Image.open(pic).convert("RGBA") pfp = circle(pfp) - pfp = pfp.resize( - (265, 265) - ) # Resizes the Profilepicture so it fits perfectly in the circle - font = ImageFont.truetype( - "Calistoga-Regular.ttf", 37 - ) # <- Text Font of the Member Count. Change the text size for your preference + pfp = pfp.resize((265, 265)) # Resizes the Profilepicture so it fits perfectly in the circle + font = ImageFont.truetype("Calistoga-Regular.ttf", 37) # <- Text Font of the Member Count. Change the text size for your preference member_text = f"User#{count}, Selamat Datang {user}" # <- Text under the Profilepicture with the Membercount draw_multiple_line_text(background, member_text, font, 395) draw_multiple_line_text(background, chat, font, 47) @@ -73,23 +67,15 @@ def welcomepic(pic, user, chat, count, id): size=20, align="right", ) - background.paste( - pfp, (379, 123), pfp - ) # Pastes the Profilepicture on the Background Image - background.save( - f"downloads/welcome#{id}.png" - ) # Saves the finished Image in the folder with the filename + background.paste(pfp, (379, 123), pfp) # Pastes the Profilepicture on the Background Image + background.save(f"downloads/welcome#{id}.png") # Saves the finished Image in the folder with the filename return f"downloads/welcome#{id}.png" @app.on_chat_member_updated(filters.group & filters.chat(-1001128045651)) @capture_err async def member_has_joined(c: app, member: ChatMemberUpdated): - if ( - not member.new_chat_member - or member.new_chat_member.status in {"banned", "left", "restricted"} - or member.old_chat_member - ): + if not member.new_chat_member or member.new_chat_member.status in {"banned", "left", "restricted"} or member.old_chat_member: return user = member.new_chat_member.user if member.new_chat_member else member.from_user if user.id in SUDO: @@ -108,21 +94,15 @@ async def member_has_joined(c: app, member: ChatMemberUpdated): pass mention = f"{user.first_name}" joined_date = datetime.fromtimestamp(time.time()).strftime("%Y.%m.%d %H:%M:%S") - first_name = ( - f"{user.first_name} {user.last_name}" if user.last_name else user.first_name - ) + first_name = f"{user.first_name} {user.last_name}" if user.last_name else user.first_name id = user.id dc = user.dc_id or "Member tanpa PP" count = await app.get_chat_members_count(member.chat.id) try: - pic = await app.download_media( - user.photo.big_file_id, file_name=f"pp{user.id}.png" - ) + pic = await app.download_media(user.photo.big_file_id, file_name=f"pp{user.id}.png") except AttributeError: pic = "img/profilepic.png" - welcomeimg = await welcomepic( - pic, user.first_name, member.chat.title, count, user.id - ) + welcomeimg = await welcomepic(pic, user.first_name, member.chat.title, count, user.id) temp.MELCOW[f"welcome-{member.chat.id}"] = await c.send_photo( member.chat.id, photo=welcomeimg, @@ -131,30 +111,18 @@ async def member_has_joined(c: app, member: ChatMemberUpdated): userspammer = "" # Spamwatch Detection try: - headers = { - "Authorization": "Bearer XvfzE4AUNXkzCy0DnIVpFDlxZi79lt6EnwKgBj8Quuzms0OSdHvf1k6zSeyzZ_lz" - } - apispamwatch = ( - await http.get( - f"https://api.spamwat.ch/banlist/{user.id}", headers=headers - ) - ).json() + headers = {"Authorization": "Bearer XvfzE4AUNXkzCy0DnIVpFDlxZi79lt6EnwKgBj8Quuzms0OSdHvf1k6zSeyzZ_lz"} + apispamwatch = (await http.get(f"https://api.spamwat.ch/banlist/{user.id}", headers=headers)).json() if not apispamwatch.get("error"): - await app.ban_chat_member( - member.chat.id, user.id, datetime.now() + timedelta(seconds=30) - ) + await app.ban_chat_member(member.chat.id, user.id, datetime.now() + timedelta(seconds=30)) userspammer += f"#SpamWatch Federation Ban\nUser {mention} [{user.id}] has been kicked because {apispamwatch.get('reason')}.\n" except Exception as err: LOGGER.error(f"ERROR in Spamwatch Detection. {err}") # Combot API Detection try: - apicombot = ( - await http.get(f"https://api.cas.chat/check?user_id={user.id}") - ).json() + apicombot = (await http.get(f"https://api.cas.chat/check?user_id={user.id}")).json() if apicombot.get("ok") == "true": - await app.ban_chat_member( - member.chat.id, user.id, datetime.now() + timedelta(seconds=30) - ) + await app.ban_chat_member(member.chat.id, user.id, datetime.now() + timedelta(seconds=30)) userspammer += f"#CAS Federation Ban\nUser {mention} [{user.id}] detected as spambot and has been kicked. Powered by Combot AntiSpam." except Exception as err: LOGGER.error(f"ERROR in Combot API Detection. {err}") @@ -183,9 +151,7 @@ async def save_group(bot, message): await db.add_chat(message.chat.id, message.chat.title) if message.chat.id in temp.BANNED_CHATS: # Inspired from a boat of a banana tree - buttons = [ - [InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")] - ] + buttons = [[InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")]] reply_markup = InlineKeyboardMarkup(buttons) k = await message.reply( text="CHAT NOT ALLOWED 🐞\n\nMy admins has restricted me from working here ! If you want to know more about it contact support..", @@ -200,9 +166,7 @@ async def save_group(bot, message): return buttons = [ [ - InlineKeyboardButton( - "ℹ️ Help", url=f"https://t.me/{temp.U_NAME}?start=help" - ), + InlineKeyboardButton("ℹ️ Help", url=f"https://t.me/{temp.U_NAME}?start=help"), InlineKeyboardButton("πŸ“’ Updates", url="https://t.me/YasirPediaChannel"), ] ] @@ -215,14 +179,10 @@ async def save_group(bot, message): for u in message.new_chat_members: count = await app.get_chat_members_count(message.chat.id) try: - pic = await app.download_media( - u.photo.big_file_id, file_name=f"pp{u.id}.png" - ) + pic = await app.download_media(u.photo.big_file_id, file_name=f"pp{u.id}.png") except AttributeError: pic = "img/profilepic.png" - welcomeimg = await welcomepic( - pic, u.first_name, message.chat.title, count, u.id - ) + welcomeimg = await welcomepic(pic, u.first_name, message.chat.title, count, u.id) if (temp.MELCOW).get(f"welcome-{message.chat.id}") is not None: try: await (temp.MELCOW[f"welcome-{message.chat.id}"]).delete() @@ -253,9 +213,7 @@ async def leave_a_chat(bot, message): except: chat = chat try: - buttons = [ - [InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")] - ] + buttons = [[InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")]] reply_markup = InlineKeyboardMarkup(buttons) await bot.send_message( chat_id=chat, @@ -287,16 +245,12 @@ async def disable_chat(bot, message): if not cha_t: return await message.reply("Chat Not Found In DB") if cha_t["is_disabled"]: - return await message.reply( - f"This chat is already disabled:\nReason- {cha_t['reason']} " - ) + return await message.reply(f"This chat is already disabled:\nReason- {cha_t['reason']} ") await db.disable_chat(chat_, reason) temp.BANNED_CHATS.append(chat_) await message.reply("Chat Succesfully Disabled") try: - buttons = [ - [InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")] - ] + buttons = [[InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")]] reply_markup = InlineKeyboardMarkup(buttons) await bot.send_message( chat_id=chat_, @@ -341,9 +295,7 @@ async def gen_invite(bot, message): try: link = await bot.create_chat_invite_link(chat) except ChatAdminRequired: - return await message.reply( - "Invite Link Generation Failed, Iam Not Having Sufficient Rights" - ) + return await message.reply("Invite Link Generation Failed, Iam Not Having Sufficient Rights") except Exception as e: return await message.reply(f"Error {e}") await message.reply(f"Here is your Invite Link {link.invite_link}") @@ -356,15 +308,11 @@ async def adminlist(_, message): return await message.reply("Perintah ini hanya untuk grup") try: administrators = [] - async for m in app.get_chat_members( - message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS - ): + async for m in app.get_chat_members(message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS): administrators.append(f"{m.user.first_name}") res = "".join(f"~ {i}\n" for i in administrators) - return await message.reply( - f"Daftar Admin di {message.chat.title} ({message.chat.id}):\n~ {res}" - ) + return await message.reply(f"Daftar Admin di {message.chat.title} ({message.chat.id}):\n~ {res}") except Exception as e: await message.reply(f"ERROR: {str(e)}") @@ -382,9 +330,7 @@ async def kickme(_, message): await message.reply_text(txt) await message.chat.unban_member(message.from_user.id) except RPCError as ef: - await message.reply_text( - f"Sepertinya ada error, silahkan report ke owner saya. \nERROR: {str(ef)}" - ) + await message.reply_text(f"Sepertinya ada error, silahkan report ke owner saya. \nERROR: {str(ef)}") except Exception as err: await message.reply(f"ERROR: {err}") diff --git a/misskaty/plugins/imdb_search.py b/misskaty/plugins/imdb_search.py index 5501ef55..9c5e4a45 100644 --- a/misskaty/plugins/imdb_search.py +++ b/misskaty/plugins/imdb_search.py @@ -9,7 +9,7 @@ from pyrogram.types import ( from misskaty import app, BOT_USERNAME from misskaty.vars import COMMAND_HANDLER from misskaty.core.decorator.errors import capture_err -from misskaty.helper.tools import rentry, get_random_string, GENRES_EMOJI +from misskaty.helper.tools import GENRES_EMOJI LOGGER = getLogger(__name__) @@ -55,9 +55,7 @@ async def imdbcari_id(client, query: CallbackQuery): r = await ambil_source(f"https://yasirapi.eu.org/imdb-search?q={kueri}") res = json.loads(r).get("result") if not res: - return await query.message.edit_caption( - f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}" - ) + return await query.message.edit_caption(f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}") msg += f"🎬 Ditemukan ({len(res)}) hasil dari: {kueri} ~ {query.from_user.mention}\n\n" for num, movie in enumerate(res, start=1): title = movie.get("l") @@ -65,18 +63,12 @@ async def imdbcari_id(client, query: CallbackQuery): type = movie.get("q").replace("feature", "movie").capitalize() movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] msg += f"{num}. {title} {year} - {type}\n" - BTN.append( - InlineKeyboardButton( - text=num, callback_data=f"imdbres_id#{uid}#{movieID}" - ) - ) + BTN.append(InlineKeyboardButton(text=num, callback_data=f"imdbres_id#{uid}#{movieID}")) BTN.append(InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}")) buttons.add(*BTN) await query.message.edit_caption(msg, reply_markup=buttons) except Exception as err: - await query.message.edit_caption( - f"Ooppss, gagal mendapatkan daftar judul di IMDb.\n\nERROR: {err}" - ) + await query.message.edit_caption(f"Ooppss, gagal mendapatkan daftar judul di IMDb.\n\nERROR: {err}") @app.on_callback_query(filters.regex("^imdbcari_en")) @@ -94,9 +86,7 @@ async def imdbcari_en(client, query: CallbackQuery): r = await ambil_source(f"https://yasirapi.eu.org/imdb-search?q={kueri}") res = json.loads(r).get("result") if not res: - return await query.message.edit_caption( - f"⛔️ Result not found for keywords: {kueri}" - ) + return await query.message.edit_caption(f"⛔️ Result not found for keywords: {kueri}") msg += f"🎬 Found ({len(res)}) result for keywords: {kueri} ~ {query.from_user.mention}\n\n" for num, movie in enumerate(res, start=1): title = movie.get("l") @@ -104,22 +94,12 @@ async def imdbcari_en(client, query: CallbackQuery): type = movie.get("q").replace("feature", "movie").capitalize() movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] msg += f"{num}. {title} {year} - {type}\n" - BTN.append( - InlineKeyboardButton( - text=num, callback_data=f"imdbres_en#{query.from_user.id}#{movieID}" - ) - ) - BTN.append( - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{query.from_user.id}" - ) - ) + BTN.append(InlineKeyboardButton(text=num, callback_data=f"imdbres_en#{query.from_user.id}#{movieID}")) + BTN.append(InlineKeyboardButton(text="❌ Close", callback_data=f"close#{query.from_user.id}")) buttons.add(*BTN) await query.message.edit_caption(msg, reply_markup=buttons) except Exception as err: - await query.message.edit_caption( - f"Failed when requesting movies title @ IMDb\n\nERROR: {err}" - ) + await query.message.edit_caption(f"Failed when requesting movies title @ IMDb\n\nERROR: {err}") @app.on_callback_query(filters.regex("^imdbres_id")) @@ -132,18 +112,12 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): url = f"https://www.imdb.com/title/tt{movie}/" resp = await get_content(url) sop = BeautifulSoup(resp, "lxml") - r_json = json.loads( - sop.find("script", attrs={"type": "application/ld+json"}).contents[0] - ) + r_json = json.loads(sop.find("script", attrs={"type": "application/ld+json"}).contents[0]) res_str = "" type = f"{r_json['@type']}" if r_json.get("@type") else "" if r_json.get("name"): try: - tahun = ( - sop.select('ul[data-testid="hero-title-block__metadata"]')[0] - .find(class_="sc-8c396aa2-2 itZqyK") - .text - ) + tahun = sop.select('ul[data-testid="hero-title-block__metadata"]')[0].find(class_="sc-8c396aa2-2 itZqyK").text except: tahun = "-" res_str += f"πŸ“Ή Judul: {r_json['name']} [{tahun}] ({type})\n" @@ -152,41 +126,21 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): else: res_str += "\n" if sop.select('li[data-testid="title-techspec_runtime"]'): - durasi = ( - sop.select('li[data-testid="title-techspec_runtime"]')[0] - .find(class_="ipc-metadata-list-item__content-container") - .text - ) + durasi = sop.select('li[data-testid="title-techspec_runtime"]')[0].find(class_="ipc-metadata-list-item__content-container").text res_str += f"Durasi: {GoogleTranslator('auto', 'id').translate(durasi)}\n" if r_json.get("contentRating"): res_str += f"Kategori: {r_json['contentRating']} \n" if r_json.get("aggregateRating"): res_str += f"Peringkat: {r_json['aggregateRating']['ratingValue']}⭐️ dari {r_json['aggregateRating']['ratingCount']} pengguna \n" if sop.select('li[data-testid="title-details-releasedate"]'): - rilis = ( - sop.select('li[data-testid="title-details-releasedate"]')[0] - .find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) - .text - ) - rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[ - 0 - ].find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - )[ - "href" - ] - res_str += ( - f"Rilis: {rilis}\n" - ) + rilis = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link").text + rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")["href"] + res_str += f"Rilis: {rilis}\n" if r_json.get("genre"): genre = "" for i in r_json["genre"]: if i in GENRES_EMOJI: - genre += ( - f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " - ) + genre += f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " else: genre += f"#{i.replace('-', '_').replace(' ', '_')}, " genre = genre[:-2] @@ -194,22 +148,13 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): if sop.select('li[data-testid="title-details-origin"]'): country = "".join( f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " - for country in sop.select('li[data-testid="title-details-origin"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + for country in sop.select('li[data-testid="title-details-origin"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) country = country[:-2] res_str += f"Negara: {country}\n" if sop.select('li[data-testid="title-details-languages"]'): language = "".join( - f"#{lang.text.replace(' ', '_').replace('-', '_')}, " - for lang in sop.select('li[data-testid="title-details-languages"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + f"#{lang.text.replace(' ', '_').replace('-', '_')}, " for lang in sop.select('li[data-testid="title-details-languages"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) language = language[:-2] res_str += f"Bahasa: {language}\n" @@ -240,9 +185,7 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): actors = actors[:-2] res_str += f"Pemeran: {actors}\n\n" if r_json.get("description"): - summary = GoogleTranslator("auto", "id").translate( - r_json.get("description") - ) + summary = GoogleTranslator("auto", "id").translate(r_json.get("description")) res_str += f"πŸ“œ Plot: {summary}\n\n" if r_json.get("keywords"): keywords = r_json["keywords"].split(",") @@ -253,11 +196,7 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): key_ = key_[:-2] res_str += f"πŸ”₯ Kata Kunci: {key_} \n" if sop.select('li[data-testid="award_information"]'): - awards = ( - sop.select('li[data-testid="award_information"]')[0] - .find(class_="ipc-metadata-list-item__list-content-item") - .text - ) + awards = sop.select('li[data-testid="award_information"]')[0].find(class_="ipc-metadata-list-item__list-content-item").text res_str += f"πŸ† Penghargaan: {GoogleTranslator('auto', 'id').translate(awards)}\n\n" else: res_str += "\n" @@ -267,33 +206,19 @@ async def imdb_id_callback(bot: Client, query: CallbackQuery): markup = InlineKeyboardMarkup( [ [ - InlineKeyboardButton( - "🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}" - ), + InlineKeyboardButton("🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}"), InlineKeyboardButton("▢️ Trailer", url=trailer_url), ] ] ) else: - markup = InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - "🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}" - ) - ] - ] - ) + markup = InlineKeyboardMarkup([[InlineKeyboardButton("🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}")]]) if thumb := r_json.get("image"): try: - await query.message.edit_media( - InputMediaPhoto(thumb, caption=res_str), reply_markup=markup - ) + await query.message.edit_media(InputMediaPhoto(thumb, caption=res_str), reply_markup=markup) except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty): poster = thumb.replace(".jpg", "._V1_UX360.jpg") - await query.message.edit_media( - InputMediaPhoto(poster, caption=res_str), reply_markup=markup - ) + await query.message.edit_media(InputMediaPhoto(poster, caption=res_str), reply_markup=markup) except Exception: await query.message.edit_caption(res_str, reply_markup=markup) else: @@ -315,18 +240,12 @@ async def imdb_en_callback(bot: Client, query: CallbackQuery): url = f"https://www.imdb.com/title/tt{movie}/" resp = await get_content(url) sop = BeautifulSoup(resp, "lxml") - r_json = json.loads( - sop.find("script", attrs={"type": "application/ld+json"}).contents[0] - ) + r_json = json.loads(sop.find("script", attrs={"type": "application/ld+json"}).contents[0]) res_str = "" type = f"{r_json['@type']}" if r_json.get("@type") else "" if r_json.get("name"): try: - tahun = ( - sop.select('ul[data-testid="hero-title-block__metadata"]')[0] - .find(class_="sc-8c396aa2-2 itZqyK") - .text - ) + tahun = sop.select('ul[data-testid="hero-title-block__metadata"]')[0].find(class_="sc-8c396aa2-2 itZqyK").text except: tahun = "-" res_str += f"πŸ“Ή Title: {r_json['name']} [{tahun}] ({type})\n" @@ -335,39 +254,21 @@ async def imdb_en_callback(bot: Client, query: CallbackQuery): else: res_str += "\n" if sop.select('li[data-testid="title-techspec_runtime"]'): - durasi = ( - sop.select('li[data-testid="title-techspec_runtime"]')[0] - .find(class_="ipc-metadata-list-item__content-container") - .text - ) + durasi = sop.select('li[data-testid="title-techspec_runtime"]')[0].find(class_="ipc-metadata-list-item__content-container").text res_str += f"Duration: {durasi}\n" if r_json.get("contentRating"): res_str += f"Category: {r_json['contentRating']} \n" if r_json.get("aggregateRating"): res_str += f"Rating: {r_json['aggregateRating']['ratingValue']}⭐️ from {r_json['aggregateRating']['ratingCount']} user \n" if sop.select('li[data-testid="title-details-releasedate"]'): - rilis = ( - sop.select('li[data-testid="title-details-releasedate"]')[0] - .find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) - .text - ) - rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[ - 0 - ].find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - )[ - "href" - ] + rilis = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link").text + rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")["href"] res_str += f"Release Data: {rilis}\n" if r_json.get("genre"): genre = "" for i in r_json["genre"]: if i in GENRES_EMOJI: - genre += ( - f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " - ) + genre += f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " else: genre += f"#{i.replace('-', '_').replace(' ', '_')}, " genre = genre[:-2] @@ -375,22 +276,13 @@ async def imdb_en_callback(bot: Client, query: CallbackQuery): if sop.select('li[data-testid="title-details-origin"]'): country = "".join( f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " - for country in sop.select('li[data-testid="title-details-origin"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + for country in sop.select('li[data-testid="title-details-origin"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) country = country[:-2] res_str += f"Country: {country}\n" if sop.select('li[data-testid="title-details-languages"]'): language = "".join( - f"#{lang.text.replace(' ', '_').replace('-', '_')}, " - for lang in sop.select('li[data-testid="title-details-languages"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + f"#{lang.text.replace(' ', '_').replace('-', '_')}, " for lang in sop.select('li[data-testid="title-details-languages"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) language = language[:-2] res_str += f"Language: {language}\n" @@ -431,11 +323,7 @@ async def imdb_en_callback(bot: Client, query: CallbackQuery): key_ = key_[:-2] res_str += f"πŸ”₯ Keywords: {key_} \n" if sop.select('li[data-testid="award_information"]'): - awards = ( - sop.select('li[data-testid="award_information"]')[0] - .find(class_="ipc-metadata-list-item__list-content-item") - .text - ) + awards = sop.select('li[data-testid="award_information"]')[0].find(class_="ipc-metadata-list-item__list-content-item").text res_str += f"πŸ† Awards: {awards}\n\n" else: res_str += "\n" @@ -445,33 +333,19 @@ async def imdb_en_callback(bot: Client, query: CallbackQuery): markup = InlineKeyboardMarkup( [ [ - InlineKeyboardButton( - "🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}" - ), + InlineKeyboardButton("🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}"), InlineKeyboardButton("▢️ Trailer", url=trailer_url), ] ] ) else: - markup = InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - "🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}" - ) - ] - ] - ) + markup = InlineKeyboardMarkup([[InlineKeyboardButton("🎬 Open IMDB", url=f"https://www.imdb.com{r_json['url']}")]]) if thumb := r_json.get("image"): try: - await query.message.edit_media( - InputMediaPhoto(thumb, caption=res_str), reply_markup=markup - ) + await query.message.edit_media(InputMediaPhoto(thumb, caption=res_str), reply_markup=markup) except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty): poster = thumb.replace(".jpg", "._V1_UX360.jpg") - await query.message.edit_media( - InputMediaPhoto(poster, caption=res_str), reply_markup=markup - ) + await query.message.edit_media(InputMediaPhoto(poster, caption=res_str), reply_markup=markup) except Exception: await query.message.edit_caption(res_str, reply_markup=markup) else: diff --git a/misskaty/plugins/inline_search.py b/misskaty/plugins/inline_search.py index 4b592de9..f1778041 100644 --- a/misskaty/plugins/inline_search.py +++ b/misskaty/plugins/inline_search.py @@ -39,12 +39,7 @@ PRVT_MSGS = {} async def inline_menu(_, inline_query: InlineQuery): if inline_query.query.strip().lower().strip() == "": buttons = InlineKeyboard(row_width=2) - buttons.add( - *[ - (InlineKeyboardButton(text=i, switch_inline_query_current_chat=i)) - for i in keywords_list - ] - ) + buttons.add(*[(InlineKeyboardButton(text=i, switch_inline_query_current_chat=i)) for i in keywords_list]) btn = InlineKeyboard(row_width=2) bot_state = "Dead" if not await app.get_me() else "Alive" @@ -68,27 +63,21 @@ async def inline_menu(_, inline_query: InlineQuery): InlineQueryResultArticle( title="Inline Commands", description="Help Related To Inline Usage.", - input_message_content=InputTextMessageContent( - "Click A Button To Get Started." - ), + input_message_content=InputTextMessageContent("Click A Button To Get Started."), thumb_url="https://hamker.me/cy00x5x.png", reply_markup=buttons, ), InlineQueryResultArticle( title="Github Repo", description="Github Repo of This Bot.", - input_message_content=InputTextMessageContent( - f"Github Repo @{BOT_USERNAME}\n\nhttps://github.com/yasirarism/MissKatyPyro" - ), + input_message_content=InputTextMessageContent(f"Github Repo @{BOT_USERNAME}\n\nhttps://github.com/yasirarism/MissKatyPyro"), thumb_url="https://hamker.me/gjc9fo3.png", ), InlineQueryResultArticle( title="Alive", description="Check Bot's Stats", thumb_url="https://yt3.ggpht.com/ytc/AMLnZu-zbtIsllERaGYY8Aecww3uWUASPMjLUUEt7ecu=s900-c-k-c0x00ffffff-no-rj", - input_message_content=InputTextMessageContent( - msg, disable_web_page_preview=True - ), + input_message_content=InputTextMessageContent(msg, disable_web_page_preview=True), reply_markup=btn, ), ] @@ -101,13 +90,8 @@ async def inline_menu(_, inline_query: InlineQuery): switch_pm_parameter="inline", ) judul = inline_query.query.split(None, 1)[1].strip() - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/61.0.3163.100 Safari/537.36" - } - search_results = await http.get( - f"https://www.google.com/search?q={judul}&num=20", headers=headers - ) + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/61.0.3163.100 Safari/537.36"} + search_results = await http.get(f"https://www.google.com/search?q={judul}&num=20", headers=headers) soup = BeautifulSoup(search_results.text, "lxml") data = [] for result in soup.select(".tF2Cxc"): @@ -130,9 +114,7 @@ async def inline_menu(_, inline_query: InlineQuery): url=link, description=snippet, thumb_url="https://te.legra.ph/file/ed8ea62ae636793000bb4.jpg", - reply_markup=InlineKeyboardMarkup( - [[InlineKeyboardButton(text="Open Website", url=link)]] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="Open Website", url=link)]]), ) ) await inline_query.answer( @@ -156,9 +138,7 @@ async def inline_menu(_, inline_query: InlineQuery): except Exception: # pylint: disable=broad-except inline_query.stop_propagation() return - namanya = ( - f"{diaa.first_name} {diaa.last_name}" if diaa.last_name else diaa.first_name - ) + namanya = f"{diaa.first_name} {diaa.last_name}" if diaa.last_name else diaa.first_name msg = f"🏷 Name: {namanya}\nπŸ†” ID: {diaa.id}\n" if diaa.username: msg += f"🌐 Username: @{diaa.username}\n" @@ -206,11 +186,7 @@ async def inline_menu(_, inline_query: InlineQuery): ) prvte_msg = InlineKeyboardMarkup( [ - [ - InlineKeyboardButton( - "Show Message πŸ”", callback_data=f"prvtmsg({inline_query.id})" - ) - ], + [InlineKeyboardButton("Show Message πŸ”", callback_data=f"prvtmsg({inline_query.id})")], [ InlineKeyboardButton( "Destroy☠️ this msg", @@ -219,14 +195,8 @@ async def inline_menu(_, inline_query: InlineQuery): ], ] ) - mention = ( - f"{penerima.first_name}" - if not penerima.username - else f"@{penerima.username}" - ) - msg_c = ( - f"πŸ”’ A private message to {mention} [{penerima.id}], " - ) + mention = f"{penerima.first_name}" if not penerima.username else f"@{penerima.username}" + msg_c = f"πŸ”’ A private message to {mention} [{penerima.id}], " msg_c += "Only he/she can open it." results = [ InlineQueryResultArticle( @@ -246,9 +216,7 @@ async def inline_menu(_, inline_query: InlineQuery): switch_pm_parameter="inline", ) query = inline_query.query.split(None, 1)[1].strip() - search_results = await http.get( - f"https://api.github.com/search/repositories?q={query}" - ) + search_results = await http.get(f"https://api.github.com/search/repositories?q={query}") srch_results = json.loads(search_results.text) item = srch_results.get("items") data = [] @@ -271,9 +239,7 @@ async def inline_menu(_, inline_query: InlineQuery): url=link, description=deskripsi, thumb_url="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png", - reply_markup=InlineKeyboardMarkup( - [[InlineKeyboardButton(text="Open Github Link", url=link)]] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="Open Github Link", url=link)]]), ) ) await inline_query.answer( @@ -292,9 +258,7 @@ async def inline_menu(_, inline_query: InlineQuery): switch_pm_parameter="inline", ) query = inline_query.query.split(None, 1)[1].strip() - search_results = await http.get( - f"https://api.hayo.my.id/api/pypi?package={query}" - ) + search_results = await http.get(f"https://api.hayo.my.id/api/pypi?package={query}") srch_results = json.loads(search_results.text) data = [] for sraeo in srch_results: @@ -315,9 +279,7 @@ async def inline_menu(_, inline_query: InlineQuery): url=link, description=deskripsi, thumb_url="https://raw.githubusercontent.com/github/explore/666de02829613e0244e9441b114edb85781e972c/topics/pip/pip.png", - reply_markup=InlineKeyboardMarkup( - [[InlineKeyboardButton(text="Open Link", url=link)]] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="Open Link", url=link)]]), ) ) await inline_query.answer( @@ -336,9 +298,7 @@ async def inline_menu(_, inline_query: InlineQuery): switch_pm_parameter="inline", ) judul = inline_query.query.split(None, 1)[1].strip() - search_results = await http.get( - f"https://api.abir-hasan.tk/youtube?query={judul}" - ) + search_results = await http.get(f"https://api.abir-hasan.tk/youtube?query={judul}") srch_results = json.loads(search_results.text) asroe = srch_results.get("results") oorse = [] @@ -350,9 +310,7 @@ async def inline_menu(_, inline_query: InlineQuery): durasi = sraeo.get("accessibility").get("duration") publishTime = sraeo.get("publishedTime") try: - deskripsi = "".join( - f"{i['text']} " for i in sraeo.get("descriptionSnippet") - ) + deskripsi = "".join(f"{i['text']} " for i in sraeo.get("descriptionSnippet")) except: deskripsi = "-" message_text = f"{title}\n" @@ -371,9 +329,7 @@ async def inline_menu(_, inline_query: InlineQuery): url=link, description=deskripsi, thumb_url=thumb, - reply_markup=InlineKeyboardMarkup( - [[InlineKeyboardButton(text="Watch Video πŸ“Ή", url=link)]] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="Watch Video πŸ“Ή", url=link)]]), ) ) await inline_query.answer( @@ -392,9 +348,7 @@ async def inline_menu(_, inline_query: InlineQuery): switch_pm_parameter="inline", ) movie_name = inline_query.query.split(None, 1)[1].strip() - search_results = await http.get( - f"https://yasirapi.eu.org/imdb-search?q={movie_name}" - ) + search_results = await http.get(f"https://yasirapi.eu.org/imdb-search?q={movie_name}") res = json.loads(search_results.text).get("result") oorse = [] for midb in res: @@ -403,11 +357,7 @@ async def inline_menu(_, inline_query: InlineQuery): stars = midb.get("s", "") imdb_url = f"https://imdb.com/title/{midb.get('id')}" year = f"({midb.get('y')})" if midb.get("y") else "" - image_url = ( - midb.get("i").get("imageUrl").replace(".jpg", "._V1_UX360.jpg") - if midb.get("i") - else "https://te.legra.ph/file/e263d10ff4f4426a7c664.jpg" - ) + image_url = midb.get("i").get("imageUrl").replace(".jpg", "._V1_UX360.jpg") if midb.get("i") else "https://te.legra.ph/file/e263d10ff4f4426a7c664.jpg" caption = f"🎬" caption += f"{title} {year}" oorse.append( @@ -482,53 +432,29 @@ async def imdb_inl(_, query): url = f"https://www.imdb.com/title/{movie}/" resp = await get_content(url) sop = BeautifulSoup(resp, "lxml") - r_json = json.loads( - sop.find("script", attrs={"type": "application/ld+json"}).contents[0] - ) + r_json = json.loads(sop.find("script", attrs={"type": "application/ld+json"}).contents[0]) res_str = "" type = f"{r_json['@type']}" if r_json.get("@type") else "" if r_json.get("name"): try: - tahun = ( - sop.select('ul[data-testid="hero-title-block__metadata"]')[0] - .find(class_="sc-8c396aa2-2 itZqyK") - .text - ) + tahun = sop.select('ul[data-testid="hero-title-block__metadata"]')[0].find(class_="sc-8c396aa2-2 itZqyK").text except: tahun = "-" res_str += f"πŸ“Ή Judul: {r_json['name']} [{tahun}] ({type})\n" if r_json.get("alternateName"): - res_str += ( - f"πŸ“’ AKA: {r_json.get('alternateName')}\n\n" - ) + res_str += f"πŸ“’ AKA: {r_json.get('alternateName')}\n\n" else: res_str += "\n" if sop.select('li[data-testid="title-techspec_runtime"]'): - durasi = ( - sop.select('li[data-testid="title-techspec_runtime"]')[0] - .find(class_="ipc-metadata-list-item__content-container") - .text - ) + durasi = sop.select('li[data-testid="title-techspec_runtime"]')[0].find(class_="ipc-metadata-list-item__content-container").text res_str += f"Durasi: {GoogleTranslator('auto', 'id').translate(durasi)}\n" if r_json.get("contentRating"): res_str += f"Kategori: {r_json['contentRating']} \n" if r_json.get("aggregateRating"): res_str += f"Peringkat: {r_json['aggregateRating']['ratingValue']}⭐️ dari {r_json['aggregateRating']['ratingCount']} pengguna \n" if sop.select('li[data-testid="title-details-releasedate"]'): - rilis = ( - sop.select('li[data-testid="title-details-releasedate"]')[0] - .find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) - .text - ) - rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[ - 0 - ].find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - )[ - "href" - ] + rilis = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link").text + rilis_url = sop.select('li[data-testid="title-details-releasedate"]')[0].find(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link")["href"] res_str += f"Rilis: {rilis}\n" if r_json.get("genre"): genre = "" @@ -542,22 +468,14 @@ async def imdb_inl(_, query): if sop.select('li[data-testid="title-details-origin"]'): country = "".join( f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " - for country in sop.select('li[data-testid="title-details-origin"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + for country in sop.select('li[data-testid="title-details-origin"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) country = country[:-2] res_str += f"Negara: {country}\n" if sop.select('li[data-testid="title-details-languages"]'): language = "".join( f"#{lang.text.replace(' ', '_').replace('-', '_')}, " - for lang in sop.select('li[data-testid="title-details-languages"]')[ - 0 - ].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - ) + for lang in sop.select('li[data-testid="title-details-languages"]')[0].findAll(class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link") ) language = language[:-2] res_str += f"Bahasa: {language}\n" @@ -588,9 +506,7 @@ async def imdb_inl(_, query): actors = actors[:-2] res_str += f"Pemeran: {actors}\n\n" if r_json.get("description"): - summary = GoogleTranslator("auto", "id").translate( - r_json.get("description") - ) + summary = GoogleTranslator("auto", "id").translate(r_json.get("description")) res_str += f"πŸ“œ Plot: {summary}\n\n" if r_json.get("keywords"): keywords = r_json["keywords"].split(",") @@ -601,11 +517,7 @@ async def imdb_inl(_, query): key_ = key_[:-2] res_str += f"πŸ”₯ Kata Kunci: {key_} \n" if sop.select('li[data-testid="award_information"]'): - awards = ( - sop.select('li[data-testid="award_information"]')[0] - .find(class_="ipc-metadata-list-item__list-content-item") - .text - ) + awards = sop.select('li[data-testid="award_information"]')[0].find(class_="ipc-metadata-list-item__list-content-item").text res_str += f"πŸ† Penghargaan: {GoogleTranslator('auto', 'id').translate(awards)}\n\n" else: res_str += "\n" diff --git a/misskaty/plugins/json.py b/misskaty/plugins/json.py index 27a141f3..bc3086b4 100644 --- a/misskaty/plugins/json.py +++ b/misskaty/plugins/json.py @@ -23,15 +23,7 @@ async def jsonify(_, message): try: await message.reply_text( f"{the_real_message}", - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{msg.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{msg.from_user.id}")]]), ) except Exception as e: with open("json.text", "w+", encoding="utf8") as out_file: @@ -41,14 +33,6 @@ async def jsonify(_, message): caption=f"{str(e)}", disable_notification=True, reply_to_message_id=reply_to_id, - reply_markup=InlineKeyboardMarkup( - [ - [ - InlineKeyboardButton( - text="❌ Close", callback_data=f"close#{msg.from_user.id}" - ) - ] - ] - ), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text="❌ Close", callback_data=f"close#{msg.from_user.id}")]]), ) os.remove("json.text") diff --git a/misskaty/plugins/misc_tools.py b/misskaty/plugins/misc_tools.py index ec5caf6a..fbb9bf7f 100644 --- a/misskaty/plugins/misc_tools.py +++ b/misskaty/plugins/misc_tools.py @@ -6,7 +6,7 @@ * Copyright @YasirPedia All rights reserved """ -import os, re +import os import aiohttp from bs4 import BeautifulSoup import json @@ -15,26 +15,20 @@ from pyrogram import Client, filters from deep_translator import GoogleTranslator from gtts import gTTS from pyrogram.errors import ( - MediaEmpty, - MessageNotModified, - PhotoInvalidDimensions, UserNotParticipant, - WebpageMediaEmpty, MessageTooLong, ) -from utils import extract_user, get_file_id, demoji +from utils import extract_user, get_file_id import time from datetime import datetime from logging import getLogger -from pykeyboard import InlineKeyboard from pyrogram.types import ( InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, - InputMediaPhoto, ) from misskaty.core.decorator.errors import capture_err -from misskaty.helper.tools import rentry, get_random_string, GENRES_EMOJI +from misskaty.helper.tools import rentry from misskaty.vars import COMMAND_HANDLER from misskaty.helper.http import http from misskaty import app, BOT_USERNAME @@ -52,6 +46,7 @@ __HELP__ = """ LIST_CARI = {} + def remove_html_tags(text): """Remove html tags from a string""" import re @@ -65,21 +60,13 @@ def remove_html_tags(text): async def stackoverflow(client, message): if len(message.command) == 1: return await message.reply("Give a query to search in StackOverflow!") - r = ( - await http.get( - f"https://api.stackexchange.com/2.3/search/excerpts?order=asc&sort=relevance&q={message.command[1]}&accepted=True&migrated=FalseΒ¬ice=False&wiki=False&site=stackoverflow" - ) - ).json() + r = (await http.get(f"https://api.stackexchange.com/2.3/search/excerpts?order=asc&sort=relevance&q={message.command[1]}&accepted=True&migrated=FalseΒ¬ice=False&wiki=False&site=stackoverflow")).json() msg = await message.reply("Getting data..") hasil = "" for count, data in enumerate(r["items"], start=1): question = data["question_id"] title = data["title"] - snippet = ( - remove_html_tags(data["excerpt"])[:80].replace("\n", "").replace(" ", "") - if len(remove_html_tags(data["excerpt"])) > 80 - else remove_html_tags(data["excerpt"]).replace("\n", "").replace(" ", "") - ) + snippet = remove_html_tags(data["excerpt"])[:80].replace("\n", "").replace(" ", "") if len(remove_html_tags(data["excerpt"])) > 80 else remove_html_tags(data["excerpt"]).replace("\n", "").replace(" ", "") hasil += f"{count}. {title}\n{snippet}\n" try: await msg.edit(hasil) @@ -98,10 +85,7 @@ async def gsearch(client, message): query = message.text.split(" ", maxsplit=1)[1] msg = await message.reply_text(f"**Googling** for `{query}` ...") try: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/61.0.3163.100 Safari/537.36" - } + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/61.0.3163.100 Safari/537.36"} html = await http.get( f"https://www.google.com/search?q={query}&gl=id&hl=id&num=17", headers=headers, @@ -130,9 +114,7 @@ async def gsearch(client, message): arr = json.dumps(data, indent=2, ensure_ascii=False) parse = json.loads(arr) total = len(parse) - res = "".join( - f"{i['title']}\n{i['snippet']}\n\n" for i in parse - ) + res = "".join(f"{i['title']}\n{i['snippet']}\n\n" for i in parse) except Exception: exc = traceback.format_exc() return await msg.edit(exc) @@ -145,9 +127,7 @@ async def gsearch(client, message): @app.on_message(filters.command(["tr", "trans", "translate"], COMMAND_HANDLER)) @capture_err async def translate(client, message): - if message.reply_to_message and ( - message.reply_to_message.text or message.reply_to_message.caption - ): + if message.reply_to_message and (message.reply_to_message.text or message.reply_to_message.caption): target_lang = "id" if len(message.command) == 1 else message.text.split()[1] text = message.reply_to_message.text or message.reply_to_message.caption else: @@ -161,14 +141,10 @@ async def translate(client, message): try: my_translator = GoogleTranslator(source="auto", target=target_lang) result = my_translator.translate(text=text) - await msg.edit( - f"Translation using source = {my_translator.source} and target = {my_translator.target}\n\n-> {result}" - ) + await msg.edit(f"Translation using source = {my_translator.source} and target = {my_translator.target}\n\n-> {result}") except MessageTooLong: url = await rentry(tekstr.text) - await msg.edit( - f"Your translated text pasted to rentry because has long text:\n{url}" - ) + await msg.edit(f"Your translated text pasted to rentry because has long text:\n{url}") except Exception as err: await msg.edit(f"Error: {str(err)}") @@ -176,9 +152,7 @@ async def translate(client, message): @app.on_message(filters.command(["tts"], COMMAND_HANDLER)) @capture_err async def tts(_, message): - if message.reply_to_message and ( - message.reply_to_message.text or message.reply_to_message.caption - ): + if message.reply_to_message and (message.reply_to_message.text or message.reply_to_message.caption): if len(message.text.split()) == 1: target_lang = "id" else: @@ -230,16 +204,12 @@ async def topho(client, message): if not message.reply_to_message or not message.reply_to_message.sticker: return await message.reply_text("Reply ke sticker untuk mengubah ke foto") if message.reply_to_message.sticker.is_animated: - return await message.reply_text( - "Ini sticker animasi, command ini hanya untuk sticker biasa." - ) + return await message.reply_text("Ini sticker animasi, command ini hanya untuk sticker biasa.") photo = await client.download_media( message.reply_to_message.sticker.file_id, f"tostick_{message.from_user.id}.jpg", ) - await message.reply_photo( - photo=photo, caption=f"Sticker -> Image\n@{client.me.username}" - ) + await message.reply_photo(photo=photo, caption=f"Sticker -> Image\n@{client.me.username}") os.remove(photo) except Exception as e: @@ -272,16 +242,10 @@ async def showid(client, message): ) file_info = get_file_id(message.reply_to_message) else: - _id += ( - "➲ User ID: " - f"{message.from_user.id if message.from_user else 'Anonymous'}\n" - ) + _id += "➲ User ID: " f"{message.from_user.id if message.from_user else 'Anonymous'}\n" file_info = get_file_id(message) if file_info: - _id += ( - f"{file_info.message_type}: " - f"{file_info.file_id}\n" - ) + _id += f"{file_info.message_type}: " f"{file_info.file_id}\n" await message.reply_text(_id, quote=True) @@ -314,23 +278,13 @@ async def who_is(client, message): if message.chat.type in (("supergroup", "channel")): try: chat_member_p = await message.chat.get_member(from_user.id) - joined_date = datetime.fromtimestamp( - chat_member_p.joined_date or time.time() - ).strftime("%Y.%m.%d %H:%M:%S") - message_out_str += ( - "➲Joined this Chat on: " f"{joined_date}" "\n" - ) + joined_date = datetime.fromtimestamp(chat_member_p.joined_date or time.time()).strftime("%Y.%m.%d %H:%M:%S") + message_out_str += "➲Joined this Chat on: " f"{joined_date}" "\n" except UserNotParticipant: pass if chat_photo := from_user.photo: local_user_photo = await client.download_media(message=chat_photo.big_file_id) - buttons = [ - [ - InlineKeyboardButton( - "πŸ” Close", callback_data=f"close#{message.from_user.id}" - ) - ] - ] + buttons = [[InlineKeyboardButton("πŸ” Close", callback_data=f"close#{message.from_user.id}")]] reply_markup = InlineKeyboardMarkup(buttons) await message.reply_photo( photo=local_user_photo, @@ -341,13 +295,7 @@ async def who_is(client, message): ) os.remove(local_user_photo) else: - buttons = [ - [ - InlineKeyboardButton( - "πŸ” Close", callback_data=f"close#{message.from_user.id}" - ) - ] - ] + buttons = [[InlineKeyboardButton("πŸ” Close", callback_data=f"close#{message.from_user.id}")]] reply_markup = InlineKeyboardMarkup(buttons) await message.reply_text( text=message_out_str, @@ -366,9 +314,7 @@ async def close_callback(bot: Client, query: CallbackQuery): await query.message.delete() -headers = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/600.1.17 (KHTML, like Gecko) Version/7.1 Safari/537.85.10" -} +headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/600.1.17 (KHTML, like Gecko) Version/7.1 Safari/537.85.10"} async def get_content(url): @@ -421,22 +367,16 @@ async def mdl_callback(bot: Client, query: CallbackQuery): try: res = (await http.get(f"https://kuryana.vercel.app/id/{slug}")).json() result += f"Title: {res['data']['title']}\n" - result += ( - f"AKA: {res['data']['others']['also_known_as']}\n\n" - ) + result += f"AKA: {res['data']['others']['also_known_as']}\n\n" result += f"Rating: {res['data']['details']['score']}\n" result += f"Content Rating: {res['data']['details']['content_rating']}\n" result += f"Type: {res['data']['details']['type']}\n" - result += ( - f"Country: {res['data']['details']['country']}\n" - ) + result += f"Country: {res['data']['details']['country']}\n" if res["data"]["details"]["type"] == "Movie": result += f"Release Date: {res['data']['details']['release_date']}\n" elif res["data"]["details"]["type"] == "Drama": result += f"Episode: {res['data']['details']['episodes']}\n" - result += ( - f"Aired: {res['data']['details']['aired']}\n" - ) + result += f"Aired: {res['data']['details']['aired']}\n" try: result += f"Aired on: {res['data']['details']['aired_on']}\n" except: @@ -445,19 +385,13 @@ async def mdl_callback(bot: Client, query: CallbackQuery): result += f"Original Network: {res['data']['details']['original_network']}\n" except: pass - result += ( - f"Duration: {res['data']['details']['duration']}\n" - ) - result += ( - f"Genre: {res['data']['others']['genres']}\n\n" - ) + result += f"Duration: {res['data']['details']['duration']}\n" + result += f"Genre: {res['data']['others']['genres']}\n\n" result += f"Synopsis: {res['data']['synopsis']}\n" result += f"Tags: {res['data']['others']['tags']}\n" - btn = InlineKeyboardMarkup( - [[InlineKeyboardButton("🎬 Open MyDramaList", url=res["data"]["link"])]] - ) + btn = InlineKeyboardMarkup([[InlineKeyboardButton("🎬 Open MyDramaList", url=res["data"]["link"])]]) await query.message.edit_text(result, reply_markup=btn) except Exception as e: await query.message.edit_text(f"ERROR:\n{e}") else: - await query.answer("Tombol ini bukan untukmu", show_alert=True) \ No newline at end of file + await query.answer("Tombol ini bukan untukmu", show_alert=True) diff --git a/misskaty/plugins/scrapwebsite.py b/misskaty/plugins/scrapwebsite.py index ea5aa875..4586bb48 100644 --- a/misskaty/plugins/scrapwebsite.py +++ b/misskaty/plugins/scrapwebsite.py @@ -11,9 +11,8 @@ from bs4 import BeautifulSoup import re import asyncio from logging import getLogger -from misskaty import app, BOT_USERNAME +from misskaty import app from pyrogram import filters -from pyrogram.errors import MessageTooLong from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton from misskaty.vars import COMMAND_HANDLER from misskaty.core.decorator.errors import capture_err @@ -34,9 +33,7 @@ __HELP__ = """ LOGGER = getLogger(__name__) -headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" -} +headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} @app.on_message(filters.command(["zonafilm"], COMMAND_HANDLER)) @@ -73,11 +70,7 @@ async def zonafilm(_, msg): await m.delete() for c, i in enumerate(data, start=1): msgs += f"{c}. {i['judul']}\nGenre: {i['genre']}\n" - msgs += ( - f"Extract: /{msg.command[0]}_scrap {i['link']}\n\n" - if not "/tv/" in i["link"] - else "\n" - ) + msgs += f"Extract: /{msg.command[0]}_scrap {i['link']}\n\n" if not "/tv/" in i["link"] else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -594,11 +587,7 @@ async def pahe_scrap(_, msg): if not res["result"]: await m.delete() return await msg.reply("404 Result not FOUND!", True) - head = ( - f"#Pahe Results For: {title}\n\n" - if title - else f"#Pahe Latest:\nπŸŒ€ Use /{msg.command[0]} [title] to start search with title.\n\n" - ) + head = f"#Pahe Results For: {title}\n\n" if title else f"#Pahe Latest:\nπŸŒ€ Use /{msg.command[0]} [title] to start search with title.\n\n" await m.delete() msgs = "" for c, i in enumerate(res["result"], start=1): @@ -659,11 +648,7 @@ async def terbit21_scrap(_, msg): msgs = "" for c, i in enumerate(res["result"], start=1): msgs += f"{c}. {i['judul']}\nCategory: {i['kategori']}\n" - msgs += ( - f"πŸ’  Download\n\n" - if not re.search(r"Complete|Ongoing", i["kategori"]) - else "\n" - ) + msgs += f"πŸ’  Download\n\n" if not re.search(r"Complete|Ongoing", i["kategori"]) else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -715,11 +700,7 @@ async def terbit21_scrap(_, msg): msgs = "" for c, i in enumerate(res["result"], start=1): msgs += f"{c}. {i['judul']}\nCategory: {i['kategori']}\n" - msgs += ( - f"πŸ’  Download\n\n" - if not re.search(r"Complete|Ongoing", i["kategori"]) - else "\n" - ) + msgs += f"πŸ’  Download\n\n" if not re.search(r"Complete|Ongoing", i["kategori"]) else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -779,11 +760,7 @@ async def lk21_scrap(_, msg): msgs = "" for c, i in enumerate(res["result"], start=1): msgs += f"{c}. {i['judul']}\nCategory: {i['kategori']}\n" - msgs += ( - f"πŸ’  Download\n\n" - if not re.search(r"Complete|Ongoing", i["kategori"]) - else "\n" - ) + msgs += f"πŸ’  Download\n\n" if not re.search(r"Complete|Ongoing", i["kategori"]) else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -838,11 +815,7 @@ async def lk21_scrap(_, msg): msgs = "" for c, i in enumerate(res["result"], start=1): msgs += f"{c}. {i['judul']}\nCategory: {i['kategori']}\n" - msgs += ( - f"πŸ’  Download\n\n" - if not re.search(r"Complete|Ongoing", i["kategori"]) - else "\n" - ) + msgs += f"πŸ’  Download\n\n" if not re.search(r"Complete|Ongoing", i["kategori"]) else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -917,11 +890,7 @@ async def gomov_scrap(_, msg): await m.delete() for c, i in enumerate(data, start=1): msgs += f"{c}. {i['judul']}\nGenre: {i['genre']}\n" - msgs += ( - f"Extract: /{msg.command[0]}_scrap {i['link']}\n\n" - if not re.search(r"Series", i["genre"]) - else "\n" - ) + msgs += f"Extract: /{msg.command[0]}_scrap {i['link']}\n\n" if not re.search(r"Series", i["genre"]) else "\n" if len(head.encode("utf-8") + msgs.encode("utf-8")) >= 4000: await msg.reply( head + msgs, @@ -967,9 +936,7 @@ async def gomov_scrap(_, msg): async def savefilm21_scrap(_, message): try: link = message.text.split(" ", maxsplit=1)[1] - headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" - } + headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} html = await http.get(link, headers=headers) soup = BeautifulSoup(html.text, "lxml") @@ -990,9 +957,7 @@ async def savefilm21_scrap(_, message): ), ) except IndexError: - return await message.reply( - f"Gunakan command /{message.command[0]} [link] untuk scrap link download" - ) + return await message.reply(f"Gunakan command /{message.command[0]} [link] untuk scrap link download") except Exception as e: await message.reply(f"ERROR: {str(e)}") @@ -1002,18 +967,14 @@ async def savefilm21_scrap(_, message): async def nodrakor_scrap(_, message): try: link = message.text.split(" ", maxsplit=1)[1] - headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" - } + headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} html = await http.get(link, headers=headers) soup = BeautifulSoup(html.text, "lxml") hasil = soup.find_all(class_="gmr-download-wrap clearfix")[0] await message.reply(f"Hasil Scrap dari {link}:\n{hasil}") except IndexError: - return await message.reply( - f"Gunakan command /{message.command[0]} [link] untuk scrap link download" - ) + return await message.reply(f"Gunakan command /{message.command[0]} [link] untuk scrap link download") except Exception as e: await message.reply(f"ERROR: {str(e)}") @@ -1024,9 +985,7 @@ async def nodrakor_scrap(_, message): async def muviku_scrap(_, message): try: link = message.text.split(" ", maxsplit=1)[1] - headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" - } + headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} html = await http.get(link, headers=headers) soup = BeautifulSoup(html.text, "lxml") @@ -1043,9 +1002,7 @@ async def muviku_scrap(_, message): res = "".join(f"Host: {i['kualitas']}\n{i['link']}\n\n" for i in data) await message.reply(res) except IndexError: - return await message.reply( - f"Gunakan command /{message.command[0]} [link] untuk scrap link download" - ) + return await message.reply(f"Gunakan command /{message.command[0]} [link] untuk scrap link download") except Exception as e: await message.reply(f"ERROR: {str(e)}") @@ -1055,9 +1012,7 @@ async def muviku_scrap(_, message): async def melong_scrap(_, message): try: link = message.text.split(" ", maxsplit=1)[1] - headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" - } + headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} html = await http.get(link, headers=headers) soup = BeautifulSoup(html.text, "lxml") @@ -1067,9 +1022,7 @@ async def melong_scrap(_, message): rep = f"{hardsub}\n{softsub}" await message.reply(rep) except IndexError: - await message.reply( - f"Gunakan command /{message.command[0]} [link] untuk scrap link download" - ) + await message.reply(f"Gunakan command /{message.command[0]} [link] untuk scrap link download") @app.on_message(filters.command(["gomov_scrap", "zonafilm_scrap"], COMMAND_HANDLER)) @@ -1077,9 +1030,7 @@ async def melong_scrap(_, message): async def gomov_zonafilm_dl(_, message): try: link = message.text.split(" ", maxsplit=1)[1] - headers = { - "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582" - } + headers = {"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"} html = await http.get(link, headers=headers) soup = BeautifulSoup(html.text, "lxml") @@ -1103,6 +1054,4 @@ async def gomov_zonafilm_dl(_, message): ), ) except IndexError: - await message.reply( - f"Gunakan command /{message.command[0]} [link] untuk scrap link download" - ) + await message.reply(f"Gunakan command /{message.command[0]} [link] untuk scrap link download") diff --git a/misskaty/plugins/stickers.py b/misskaty/plugins/stickers.py index 37de2791..7b7f2618 100644 --- a/misskaty/plugins/stickers.py +++ b/misskaty/plugins/stickers.py @@ -36,11 +36,7 @@ __HELP__ = """ def get_emoji_regex(): - e_list = [ - getattr(emoji, e).encode("unicode-escape").decode("ASCII") - for e in dir(emoji) - if not e.startswith("_") - ] + e_list = [getattr(emoji, e).encode("unicode-escape").decode("ASCII") for e in dir(emoji) if not e.startswith("_")] # to avoid re.error excluding char that start with '*' e_sort = sorted([x for x in e_list if not x.startswith("*")], reverse=True) # Sort emojis by length to make sure multi-character emojis are @@ -69,11 +65,7 @@ async def getsticker_(c, m): ) await m.reply_to_message.reply_document( document=sticker_file, - caption=( - f"Emoji: {sticker.emoji}\n" - f"Sticker ID: {sticker.file_id}\n\n" - f"Send by: @{BOT_USERNAME}" - ), + caption=(f"Emoji: {sticker.emoji}\n" f"Sticker ID: {sticker.file_id}\n\n" f"Send by: @{BOT_USERNAME}"), ) shutil.rmtree(tempdir, ignore_errors=True) else: @@ -83,11 +75,7 @@ async def getsticker_(c, m): @app.on_message(filters.command("stickerid", COMMAND_HANDLER) & filters.reply) async def getstickerid(c, m): if m.reply_to_message.sticker: - await m.reply_text( - "The ID of this sticker is: {stickerid}".format( - stickerid=m.reply_to_message.sticker.file_id - ) - ) + await m.reply_text("The ID of this sticker is: {stickerid}".format(stickerid=m.reply_to_message.sticker.file_id)) @app.on_message(filters.command("unkang", COMMAND_HANDLER) & filters.reply) @@ -106,9 +94,7 @@ async def getstickerid(c, m): except Exception as e: await pp.edit(f"Failed remove sticker from your pack.\n\nERR: {e}") else: - await m.reply_text( - f"Please reply sticker that created by {c.me.username} to remove sticker from your pack." - ) + await m.reply_text(f"Please reply sticker that created by {c.me.username} to remove sticker from your pack.") @app.on_message(filters.command(["curi", "kang"], COMMAND_HANDLER)) @@ -139,10 +125,7 @@ async def kang_sticker(c, m): if "image" in reply.document.mime_type: # mime_type: image/webp resize = True - elif ( - MessageMediaType.VIDEO == reply.document.mime_type - or MessageMediaType.ANIMATION == reply.document.mime_type - ): + elif MessageMediaType.VIDEO == reply.document.mime_type or MessageMediaType.ANIMATION == reply.document.mime_type: # mime_type: application/video videos = True convert = True @@ -172,10 +155,7 @@ async def kang_sticker(c, m): packname = f"{pack_prefix}{packnum}_{m.from_user.id}_by_{c.me.username}" if len(m.command) > 1: # matches all valid emojis in input - sticker_emoji = ( - "".join(set(EMOJI_PATTERN.findall("".join(m.command[1:])))) - or sticker_emoji - ) + sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[1:])))) or sticker_emoji filename = await c.download_media(m.reply_to_message) if not filename: # Failed to download @@ -186,11 +166,7 @@ async def kang_sticker(c, m): filename = "sticker.png" packname = f"c{m.from_user.id}_by_{c.me.username}" img_url = next( - ( - m.text[y.offset : (y.offset + y.length)] - for y in m.entities - if y.type == "url" - ), + (m.text[y.offset : (y.offset + y.length)] for y in m.entities if y.type == "url"), None, ) @@ -210,15 +186,10 @@ async def kang_sticker(c, m): packnum = m.command.pop(2) packname = f"a{packnum}_{m.from_user.id}_by_{c.me.username}" if len(m.command) > 2: - sticker_emoji = ( - "".join(set(EMOJI_PATTERN.findall("".join(m.command[2:])))) - or sticker_emoji - ) + sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[2:])))) or sticker_emoji resize = True else: - return await prog_msg.edit_text( - "Want me to guess the sticker? Please tag a sticker." - ) + return await prog_msg.edit_text("Want me to guess the sticker? Please tag a sticker.") try: if resize: filename = resize_image(filename) @@ -237,9 +208,7 @@ async def kang_sticker(c, m): ) if stickerset.set.count >= max_stickers: packnum += 1 - packname = ( - f"{pack_prefix}_{packnum}_{m.from_user.id}_by_{c.me.username}" - ) + packname = f"{pack_prefix}_{packnum}_{m.from_user.id}_by_{c.me.username}" else: packname_found = True except StickersetInvalid: @@ -319,9 +288,7 @@ async def kang_sticker(c, m): ) except BadRequest: - return await prog_msg.edit_text( - "Your Sticker Pack is full if your pack is not in v1 Type /kang 1, if it is not in v2 Type /kang 2 and so on." - ) + return await prog_msg.edit_text("Your Sticker Pack is full if your pack is not in v1 Type /kang 1, if it is not in v2 Type /kang 2 and so on.") except Exception as all_e: await prog_msg.edit_text(f"{all_e.__class__.__name__} : {all_e}") else: diff --git a/misskaty/plugins/sub_extractor.py b/misskaty/plugins/sub_extractor.py index 5a9c0c60..b59bb73e 100644 --- a/misskaty/plugins/sub_extractor.py +++ b/misskaty/plugins/sub_extractor.py @@ -4,7 +4,7 @@ * @projectName MissKatyPyro * Copyright @YasirPedia All rights reserved """ -from misskaty import app, BOT_USERNAME +from misskaty import app from logging import getLogger from pyrogram import filters from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup @@ -59,18 +59,12 @@ def get_subname(lang, url, format): async def ceksub(_, m): cmd = m.text.split(" ", 1) if len(cmd) == 1: - return await m.reply( - f"Gunakan command /{m.command[0]} [link] untuk mengecek subtitle dan audio didalam video." - ) + return await m.reply(f"Gunakan command /{m.command[0]} [link] untuk mengecek subtitle dan audio didalam video.") link = cmd[1] start_time = perf_counter() pesan = await m.reply("Sedang memproses perintah..", quote=True) try: - res = ( - await shell_exec( - f"ffprobe -loglevel 0 -print_format json -show_format -show_streams {link}" - ) - )[0] + res = (await shell_exec(f"ffprobe -loglevel 0 -print_format json -show_format -show_streams {link}"))[0] details = json.loads(res) buttons = [] for stream in details["streams"]: @@ -102,33 +96,20 @@ async def ceksub(_, m): reply_markup=InlineKeyboardMarkup(buttons), ) except Exception: - err = traceback.format_exc() - await pesan.edit( - f"Failed extract media, make sure your link is not protected by WAF or maybe inaccessible for bot." - ) + traceback.format_exc() + await pesan.edit(f"Failed extract media, make sure your link is not protected by WAF or maybe inaccessible for bot.") @app.on_message(filters.command(["converttosrt"], COMMAND_HANDLER)) @capture_err async def convertsrt(c, m): reply = m.reply_to_message - if ( - not reply - and reply.document - and ( - reply.document.file_name.endswith(".vtt") - or reply.document.file_name.endswith(".ass") - ) - ): - return await m.reply( - f"Use command /{m.command[0]} by reply to .ass or .vtt file, to convert subtitle from .ass or .vtt to srt." - ) + if not reply and reply.document and (reply.document.file_name.endswith(".vtt") or reply.document.file_name.endswith(".ass")): + return await m.reply(f"Use command /{m.command[0]} by reply to .ass or .vtt file, to convert subtitle from .ass or .vtt to srt.") msg = await m.reply("⏳ Converting...") dl = await reply.download() filename = dl.split("/", 3)[3] - LOGGER.info( - f"ConvertSub: {filename} by {m.from_user.first_name} [{m.from_user.id}]" - ) + LOGGER.info(f"ConvertSub: {filename} by {m.from_user.first_name} [{m.from_user.id}]") (await shell_exec(f"mediaextract -i '{dl}' '{filename}.srt'"))[0] c_time = time() await m.reply_document( @@ -168,12 +149,8 @@ async def stream_extract(bot, update): format = "srt" start_time = perf_counter() namafile = get_subname(lang, link, format) - LOGGER.info( - f"ExtractSub: {namafile} by {update.from_user.first_name} [{update.from_user.id}]" - ) - extract = (await shell_exec(f"mediaextract -i {link} -map {map} '{namafile}'"))[ - 0 - ] + LOGGER.info(f"ExtractSub: {namafile} by {update.from_user.first_name} [{update.from_user.id}]") + extract = (await shell_exec(f"mediaextract -i {link} -map {map} '{namafile}'"))[0] end_time = perf_counter() timelog = "{:.2f}".format(end_time - start_time) + " second" c_time = time() diff --git a/misskaty/plugins/ytdl_download_new.py b/misskaty/plugins/ytdl_download_new.py index d6a9ea45..1e0e1cab 100644 --- a/misskaty/plugins/ytdl_download_new.py +++ b/misskaty/plugins/ytdl_download_new.py @@ -15,9 +15,7 @@ from iytdl import iYTDL, main from uuid import uuid4 LOGGER = getLogger(__name__) -regex = recompile( - r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P[A-Za-z0-9\-=_]{11})" -) +regex = recompile(r"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?(?P[A-Za-z0-9\-=_]{11})") YT_DB = {} @@ -65,9 +63,7 @@ async def ytdownv2(_, message): if len(message.command) == 1: return await message.reply("Please input a valid YT-DLP Supported URL") url = message.text.split(" ", maxsplit=1)[1] - async with iYTDL( - log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract" - ) as ytdl: + async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: x = await ytdl.parse(url) if x is None: return await message.reply("Failed parse URL, check logs..") @@ -82,13 +78,9 @@ async def ytdl_listall_callback(_, cq: CallbackQuery): if cq.from_user.id != cq.message.reply_to_message.from_user.id: return await cq.answer("Not your task", True) callback = cq.data.split("|") - async with iYTDL( - log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract" - ) as ytdl: + async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: media, buttons = await ytdl.listview(callback[1]) - await cq.edit_message_media( - media=media, reply_markup=buttons.add(cq.from_user.id) - ) + await cq.edit_message_media(media=media, reply_markup=buttons.add(cq.from_user.id)) @app.on_callback_query(filters.regex(r"^yt_extract_info")) @@ -97,9 +89,7 @@ async def ytdl_extractinfo_callback(_, cq: CallbackQuery): return await cq.answer("Not your task", True) await cq.answer("Please Wait...") callback = cq.data.split("|") - async with iYTDL( - log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract" - ) as ytdl: + async with iYTDL(log_group_id=0, cache_path="cache", ffmpeg_location="/usr/bin/mediaextract") as ytdl: if data := await ytdl.extract_info_from_key(callback[1]): if len(key) == 11: await cq.edit_message_text( @@ -125,10 +115,7 @@ async def ytdl_gendl_callback(_, cq: CallbackQuery): callback = cq.data.split("|") key = callback[1] if callback[0] == "yt_gen": - if ( - match := regex.match(cq.message.reply_to_message.command[1]) - or len(callback) == 2 - ): + if match := regex.match(cq.message.reply_to_message.command[1]) or len(callback) == 2: x = await main.Extractor().get_download_button(key) await cq.edit_message_caption(caption=x.caption, reply_markup=x.buttons) else: @@ -144,9 +131,7 @@ async def ytdl_gendl_callback(_, cq: CallbackQuery): ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: - upload_key = await ytdl.download( - cq.message.reply_to_message.command[1], uid, format_, cq, True, 3 - ) + upload_key = await ytdl.download(cq.message.reply_to_message.command[1], uid, format_, cq, True, 3) await ytdl.upload(app, upload_key, format_, cq, True) else: uid = callback[2] @@ -161,9 +146,7 @@ async def ytdl_gendl_callback(_, cq: CallbackQuery): ffmpeg_location="/usr/bin/mediaextract", delete_media=True, ) as ytdl: - upload_key = await ytdl.download( - "https://www.youtube.com/watch?v=" + key, uid, format_, cq, True, 3 - ) + upload_key = await ytdl.download("https://www.youtube.com/watch?v=" + key, uid, format_, cq, True, 3) await ytdl.upload(app, upload_key, format_, cq, True) @@ -184,9 +167,7 @@ async def ytdl_scroll_callback(_, cq: CallbackQuery): out += f"\n❯ Uploader: {i['channel']['name']}\n\n" scroll_btn = [ [ - InlineKeyboardButton( - f"Back", callback_data=f"ytdl_scroll|{search_key}|{page-1}" - ), + InlineKeyboardButton(f"Back", callback_data=f"ytdl_scroll|{search_key}|{page-1}"), InlineKeyboardButton( f"{page+1}/{len(search['result'])}", callback_data=f"ytdl_scroll|{search_key}|{page+1}", @@ -201,9 +182,7 @@ async def ytdl_scroll_callback(_, cq: CallbackQuery): scroll_btn = [[scroll_btn.pop().pop(0)]] btn = [[InlineKeyboardButton("Download", callback_data=f"yt_gen|{i['id']}")]] btn = InlineKeyboardMarkup(scroll_btn + btn) - await cq.edit_message_media( - InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn - ) + await cq.edit_message_media(InputMediaPhoto(await get_ytthumb(i["id"]), caption=out), reply_markup=btn) async def get_ytthumb(videoid: str):