From 983db698641b8c58c1a549401c74b2d35e4c8326 Mon Sep 17 00:00:00 2001 From: Yasir Aris M Date: Mon, 15 Jan 2024 07:33:05 +0700 Subject: [PATCH] 'Refactored by Sourcery' Co-authored-by: Sourcery AI <> --- database/blacklist_db.py | 4 +- database/feds_db.py | 59 +++------ database/users_chats_db.py | 4 +- misskaty/core/misskaty_patch/bound/message.py | 4 +- misskaty/helper/functions.py | 6 +- misskaty/helper/kuso_utils.py | 11 +- misskaty/helper/misc.py | 9 +- misskaty/helper/sqlite_helper.py | 9 +- misskaty/helper/tools.py | 14 +- misskaty/plugins/admin.py | 2 +- misskaty/plugins/dev.py | 4 +- misskaty/plugins/download_upload.py | 11 +- misskaty/plugins/federation.py | 124 +++++++----------- misskaty/plugins/filters.py | 22 ++-- misskaty/plugins/fun.py | 2 +- misskaty/plugins/imdb_search.py | 12 +- misskaty/plugins/misc_tools.py | 88 ++++++------- misskaty/plugins/notes.py | 39 +++--- misskaty/plugins/quotly.py | 4 +- misskaty/plugins/web_scraper.py | 7 +- misskaty/vars.py | 17 +-- update.py | 1 - 22 files changed, 178 insertions(+), 275 deletions(-) diff --git a/database/blacklist_db.py b/database/blacklist_db.py index d05ec9cc..c7524aee 100644 --- a/database/blacklist_db.py +++ b/database/blacklist_db.py @@ -7,9 +7,7 @@ blacklist_filtersdb = dbname["blacklistFilters"] async def get_blacklisted_words(chat_id: int) -> List[str]: _filters = await blacklist_filtersdb.find_one({"chat_id": chat_id}) - if not _filters: - return [] - return _filters["filters"] + return [] if not _filters else _filters["filters"] async def save_blacklist_filter(chat_id: int, word: str): diff --git a/database/feds_db.py b/database/feds_db.py index 7a26aef3..f5a19649 100644 --- a/database/feds_db.py +++ b/database/feds_db.py @@ -9,9 +9,7 @@ fedsdb = dbname["federation"] def get_fed_info(fed_id): get = fedsdb.find_one({"fed_id": str(fed_id)}) - if get is None: - return False - return get + return False if get is None else get async def get_fed_id(chat_id): @@ -19,12 +17,14 @@ async def get_fed_id(chat_id): if get is None: return False - else: - for chat_info in get.get("chat_ids", []): - if chat_info["chat_id"] == int(chat_id): - return get["fed_id"] - - return False + return next( + ( + get["fed_id"] + for chat_info in get.get("chat_ids", []) + if chat_info["chat_id"] == int(chat_id) + ), + False, + ) async def get_feds_by_owner(owner_id): @@ -32,10 +32,9 @@ async def get_feds_by_owner(owner_id): feds = await cursor.to_list(length=None) if not feds: return False - federations = [ + return [ {"fed_id": fed["fed_id"], "fed_name": fed["fed_name"]} for fed in feds ] - return federations async def transfer_owner(fed_id, current_owner_id, new_owner_id): @@ -58,10 +57,7 @@ async def set_log_chat(fed_id, log_group_id: int): async def get_fed_name(chat_id): get = await fedsdb.find_one(int(chat_id)) - if get is None: - return False - else: - return get["fed_name"] + return False if get is None else get["fed_name"] async def is_user_fed_owner(fed_id, user_id: int): @@ -69,19 +65,13 @@ async def is_user_fed_owner(fed_id, user_id: int): if not getfed: return False owner_id = getfed["owner_id"] - if user_id == owner_id or user_id not in SUDO: - return True - else: - return False + return user_id == owner_id or user_id not in SUDO async def search_fed_by_id(fed_id): get = await fedsdb.find_one({"fed_id": str(fed_id)}) - if get is not None: - return get - else: - return False + return get if get is not None else False def chat_join_fed(fed_id, chat_name, chat_id): @@ -96,41 +86,26 @@ async def chat_leave_fed(chat_id): {"chat_ids.chat_id": int(chat_id)}, {"$pull": {"chat_ids": {"chat_id": int(chat_id)}}}, ) - if result.modified_count > 0: - return True - else: - return False + return result.modified_count > 0 async def user_join_fed(fed_id, user_id): result = await fedsdb.update_one( {"fed_id": fed_id}, {"$addToSet": {"fadmins": int(user_id)}}, upsert=True ) - if result.modified_count > 0: - return True - else: - return False + return result.modified_count > 0 async def user_demote_fed(fed_id, user_id): result = await fedsdb.update_one( {"fed_id": fed_id}, {"$pull": {"fadmins": int(user_id)}} ) - if result.modified_count > 0: - return True - else: - return False + return result.modified_count > 0 async def search_user_in_fed(fed_id, user_id): getfed = await search_fed_by_id(fed_id) - if getfed is None: - return False - fadmins = getfed["fadmins"] - if user_id in fadmins: - return True - else: - return False + return False if getfed is None else user_id in getfed["fadmins"] async def chat_id_and_names_in_fed(fed_id): diff --git a/database/users_chats_db.py b/database/users_chats_db.py index a636e1ef..50eefa82 100644 --- a/database/users_chats_db.py +++ b/database/users_chats_db.py @@ -51,9 +51,7 @@ class UsersData: async def get_ban_status(self, id): user = await self.col.find_one({"_id": int(id)}) - if user: - return True, user - return False, None + return (True, user) if user else (False, None) async def get_all_users(self): return self.col.find({}) diff --git a/misskaty/core/misskaty_patch/bound/message.py b/misskaty/core/misskaty_patch/bound/message.py index 18d487ab..935206a1 100644 --- a/misskaty/core/misskaty_patch/bound/message.py +++ b/misskaty/core/misskaty_patch/bound/message.py @@ -24,9 +24,7 @@ LOGGER = getLogger("MissKaty") @property def parse_cmd(msg): - if len(msg.command) > 1: - return msg.text.split(None, 1)[1] - return None + return msg.text.split(None, 1)[1] if len(msg.command) > 1 else None async def reply_text( diff --git a/misskaty/helper/functions.py b/misskaty/helper/functions.py index b8923e51..84ce56af 100644 --- a/misskaty/helper/functions.py +++ b/misskaty/helper/functions.py @@ -24,7 +24,11 @@ def extract_urls(reply_markup): for i, row in enumerate(buttons): for j, button in enumerate(row): if button.url: - name = "\n~\nbutton" if i * len(row) + j + 1 == 1 else f"button{i * len(row) + j + 1}" + name = ( + "\n~\nbutton" + if i * len(row) + j == 0 + else f"button{i * len(row) + j + 1}" + ) urls.append((f"{name}", button.text, button.url)) return urls diff --git a/misskaty/helper/kuso_utils.py b/misskaty/helper/kuso_utils.py index 1c9fc9f8..91daa1af 100644 --- a/misskaty/helper/kuso_utils.py +++ b/misskaty/helper/kuso_utils.py @@ -18,7 +18,7 @@ async def kusonimeBypass(url: str): result = {} page = await fetch.get(url) if page.status_code != 200: - raise Exception(f"ERROR: Hostname might be blocked by server!") + raise Exception("ERROR: Hostname might be blocked by server!") try: soup = BeautifulSoup(page.text, "lxml") thumb = soup.find("div", {"class": "post-thumb"}).find("img").get("src") @@ -96,7 +96,7 @@ async def kusonimeBypass(url: str): num += 1 result.update({"title": title, "thumb": thumb, "genre": genre, "genre_string": ", ".join(genre), "status_anime": status_anime, "season": season, "tipe": tipe, "ep": ep, "score": score, "duration": duration, "rilis": rilis, "data": data}) except Exception as e: - if len(result) != 0: + if result: result.clear() err = traceback.format_exc() LOGGER.error(f"class: {e.__class__.__name_}, {err}") @@ -133,10 +133,11 @@ async def byPassPh(url: str, name: str) -> Optional[str]:
{{/data}} """.strip() - plink = await post_to_telegraph( - False, f"{kusonime.get('title')} By {escape(name)}", chevron.render(template, kusonime) + return await post_to_telegraph( + False, + f"{kusonime.get('title')} By {escape(name)}", + chevron.render(template, kusonime), ) - return plink class Kusonime: diff --git a/misskaty/helper/misc.py b/misskaty/helper/misc.py index 0efe464a..dc3a4f56 100644 --- a/misskaty/helper/misc.py +++ b/misskaty/helper/misc.py @@ -76,11 +76,12 @@ def paginate_modules(page_n, module_dict, prefix, chat=None): ) ] else: - pairs = pairs[modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1)] + [ - ( + pairs = pairs[ + modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1) + ] + [ + ( EqInlineKeyboardButton( - "Back", - callback_data="{}_home({})".format(prefix, modulo_page), + "Back", callback_data=f"{prefix}_home({modulo_page})" ), ) ] diff --git a/misskaty/helper/sqlite_helper.py b/misskaty/helper/sqlite_helper.py index 3285a625..5b7fe3b7 100644 --- a/misskaty/helper/sqlite_helper.py +++ b/misskaty/helper/sqlite_helper.py @@ -173,9 +173,7 @@ class Cache: @staticmethod def _exp_datetime(exp: float) -> Optional[datetime]: - if exp == -1.0: - return None - return datetime.utcfromtimestamp(exp) + return None if exp == -1.0 else datetime.utcfromtimestamp(exp) def _stream(self, value: Any) -> bytes: return pickle.dumps(value, protocol=self.PICKLE_PROTOCOL) @@ -407,10 +405,7 @@ class Cache: def get_all(self) -> Dict[str, Any]: """Get all key-value pairs from the cache.""" all_data = self._con.execute("SELECT key, value FROM cache;").fetchall() - results = {} - for key, value in all_data: - results[key] = self._unstream(value) - return results + return {key: self._unstream(value) for key, value in all_data} def clear(self) -> None: """Clear the cache from all values.""" diff --git a/misskaty/helper/tools.py b/misskaty/helper/tools.py index 75932999..70323f3b 100644 --- a/misskaty/helper/tools.py +++ b/misskaty/helper/tools.py @@ -161,18 +161,10 @@ def isValidURL(str): "{2,256}\\.[a-z]" + "{2,6}\\b([-a-zA-Z0-9@:%" + "._\\+~#?&//=]*)") - + # Compile the ReGex p = re.compile(regex) - + # If the string is empty # return false - if (str == None): - return False - - # Return if the string - # matched the ReGex - if(re.search(p, str)): - return True - else: - return False \ No newline at end of file + return False if str is None else bool((re.search(p, str))) \ No newline at end of file diff --git a/misskaty/plugins/admin.py b/misskaty/plugins/admin.py index 780216f7..834b3762 100644 --- a/misskaty/plugins/admin.py +++ b/misskaty/plugins/admin.py @@ -863,7 +863,6 @@ async def set_chat_photo(_, ctx: Message): @app.on_message(filters.group & filters.command('mentionall', COMMAND_HANDLER)) async def mentionall(app: Client, msg: Message): - NUM = 4 user = await msg.chat.get_member(msg.from_user.id) if user.status in (enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR): total = [] @@ -874,6 +873,7 @@ async def mentionall(app: Client, msg: Message): else: total.append(member.user.mention()) + NUM = 4 for i in range(0, len(total), NUM): message = ' '.join(total[i:i+NUM]) await app.send_message(msg.chat.id, message, message_thread_id=msg.message_thread_id) diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py index 8469436b..c4628b50 100644 --- a/misskaty/plugins/dev.py +++ b/misskaty/plugins/dev.py @@ -126,10 +126,8 @@ async def log_file(_, ctx: Message, strings): @app.on_message(filters.command(["donate"], COMMAND_HANDLER)) async def donate(self: Client, ctx: Message): - try: + with contextlib.suppress(ReactionInvalid): await ctx.react(emoji="❤️") - except ReactionInvalid: - pass try: await ctx.reply_photo( "https://img.yasirweb.eu.org/file/9427d61d6968b8ee4fb2f.jpg", diff --git a/misskaty/plugins/download_upload.py b/misskaty/plugins/download_upload.py index c1de743f..566109c8 100644 --- a/misskaty/plugins/download_upload.py +++ b/misskaty/plugins/download_upload.py @@ -242,7 +242,12 @@ async def twitterdl(_, message): "ts": "1691416179", "source": "form", } - post = await fetch.post(f"https://ssstwitter.com/id", data=data, headers=headers, follow_redirects=True) + post = await fetch.post( + "https://ssstwitter.com/id", + data=data, + headers=headers, + follow_redirects=True, + ) if post.status_code not in [200, 401]: return await msg.edit_msg("Unknown error.") soup = BeautifulSoup(post.text, "lxml") @@ -275,7 +280,9 @@ async def tiktokdl(_, message): msg = await message.reply("Trying download...") try: r = ( - await fetch.post(f"https://lovetik.com/api/ajax/search", data={"query": link}) + await fetch.post( + "https://lovetik.com/api/ajax/search", data={"query": link} + ) ).json() fname = (await fetch.head(r["links"][0]["a"])).headers.get("content-disposition", "") filename = unquote(fname.split('filename=')[1].strip('"').split('"')[0]) diff --git a/misskaty/plugins/federation.py b/misskaty/plugins/federation.py index e1c918ec..681ac5b5 100644 --- a/misskaty/plugins/federation.py +++ b/misskaty/plugins/federation.py @@ -61,11 +61,11 @@ async def new_fed(self, message): if len(message.command) < 2: return await message.reply_msg("Please write the name of the federation!") fednam = message.text.split(None, 1)[1] - if not fednam == "": + if fednam != "": fed_id = str(uuid.uuid4()) fed_name = fednam x = await fedsdb.update_one( - {"fed_id": str(fed_id)}, + {"fed_id": fed_id}, { "$set": { "fed_name": str(fed_name), @@ -85,17 +85,13 @@ async def new_fed(self, message): ) await message.reply_msg( - "**You have succeeded in creating a new federation!**" - "\nName: `{}`" - "\nID: `{}`" - "\n\nUse the command below to join the federation:" - "\n`/joinfed {}`".format(fed_name, fed_id, fed_id), + f"**You have succeeded in creating a new federation!**\nName: `{fed_name}`\nID: `{fed_id}`\n\nUse the command below to join the federation:\n`/joinfed {fed_id}`", parse_mode=ParseMode.MARKDOWN, ) try: await app.send_message( LOG_GROUP_ID, - "New Federation: {}\nID:
{}
".format(fed_name, fed_id), + f"New Federation: {fed_name}\nID:
{fed_id}
", parse_mode=ParseMode.HTML, ) except: @@ -114,24 +110,21 @@ async def del_fed(client, message): "Federations can only be deleted by privately messaging me." ) args = message.text.split(" ", 1) - if len(args) > 1: - is_fed_id = args[1].strip() - getinfo = await get_fed_info(is_fed_id) - if getinfo is False: - return await message.reply_text("This federation does not exist.") - if getinfo["owner_id"] == user.id or user.id not in SUDO: - fed_id = is_fed_id - else: - return await message.reply_text("Only federation owners can do this!") - else: + if len(args) <= 1: return await message.reply_text("What should I delete?") + is_fed_id = args[1].strip() + getinfo = await get_fed_info(is_fed_id) + if getinfo is False: + return await message.reply_text("This federation does not exist.") + if getinfo["owner_id"] == user.id or user.id not in SUDO: + fed_id = is_fed_id + else: + return await message.reply_text("Only federation owners can do this!") is_owner = await is_user_fed_owner(fed_id, user.id) if is_owner is False: return await message.reply_text("Only federation owners can do this!") await message.reply_text( - "You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{}' will be permanently lost.".format( - getinfo["fed_name"] - ), + f"""You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{getinfo["fed_name"]}' will be permanently lost.""", reply_markup=InlineKeyboardMarkup( [ [ @@ -245,10 +238,10 @@ async def fed_log(client, message): "Send this command on the chat which you need to set as fed log channel." ) member = await app.get_chat_member(chat.id, user.id) - if ( - member.status == ChatMemberStatus.OWNER - or member.status == ChatMemberStatus.ADMINISTRATOR - ): + if member.status in [ + ChatMemberStatus.OWNER, + ChatMemberStatus.ADMINISTRATOR, + ]: if len(message.command) < 2: return await message.reply_text( "Please provide the Id of the federation with the command!" @@ -258,10 +251,7 @@ async def fed_log(client, message): if info is False: return await message.reply_text("This federation does not exist.") if await is_user_fed_owner(fed_id, user.id): - if "/unsetfedlog" in message.text: - log_group_id = LOG_GROUP_ID - else: - log_group_id = chat.id + log_group_id = LOG_GROUP_ID if "/unsetfedlog" in message.text else chat.id loged = await set_log_chat(fed_id, log_group_id) if "/unsetfedlog" in message.text: return await message.reply_text("log channel removed successfully.") @@ -281,21 +271,17 @@ async def fed_chat(self, message): fed_id = await get_fed_id(chat.id) member = await self.get_chat_member(chat.id, user.id) - if ( - member.status == ChatMemberStatus.OWNER - or member.status == ChatMemberStatus.ADMINISTRATOR - ): - pass - else: + if member.status not in [ + ChatMemberStatus.OWNER, + ChatMemberStatus.ADMINISTRATOR, + ]: return await message.reply_text("You must be an admin to execute this command") if not fed_id: return await message.reply_text("This group is not in any federation!") info = await get_fed_info(fed_id) - text = "This group is part of the following federation:" - text += "\n{} (ID: {})".format(info["fed_name"], fed_id) - + text = f'This group is part of the following federation:\n{info["fed_name"]} (ID: {fed_id})' await message.reply_text(text, parse_mode=ParseMode.HTML) @@ -312,13 +298,10 @@ async def join_fed(self, message): member = await self.get_chat_member(chat.id, user.id) fed_id = await get_fed_id(int(chat.id)) - if user.id in SUDO: - pass - else: - if member.status == ChatMemberStatus.OWNER: - pass - else: - return await message.reply_text("Only group creators can use this command!") + if ( + user.id in SUDO or member.status != ChatMemberStatus.OWNER + ) and user.id not in SUDO: + return await message.reply_text("Only group creators can use this command!") if fed_id: return await message.reply_text("You cannot join two federations from one chat") args = message.text.split(" ", 1) @@ -334,18 +317,15 @@ async def join_fed(self, message): f"Failed to join federation! Please contact {SUPPORT_CHAT} if this problem persists!" ) - get_fedlog = getfed["log_group_id"] - if get_fedlog: + if get_fedlog := getfed["log_group_id"]: await app.send_message( get_fedlog, - "Chat **{}** has joined the federation **{}**".format( - chat.title, getfed["fed_name"] - ), + f'Chat **{chat.title}** has joined the federation **{getfed["fed_name"]}**', parse_mode=ParseMode.MARKDOWN, ) await message.reply_text( - "This group has joined the federation: {}!".format(getfed["fed_name"]) + f'This group has joined the federation: {getfed["fed_name"]}!' ) else: await message.reply_text( @@ -370,17 +350,14 @@ async def leave_fed(client, message): member = await app.get_chat_member(chat.id, user.id) if member.status == ChatMemberStatus.OWNER or user.id in SUDO: if await chat_leave_fed(int(chat.id)) is True: - get_fedlog = fed_info["log_group_id"] - if get_fedlog: + if get_fedlog := fed_info["log_group_id"]: await app.send_message( get_fedlog, - "Chat **{}** has left the federation **{}**".format( - chat.title, fed_info["fed_name"] - ), + f'Chat **{chat.title}** has left the federation **{fed_info["fed_name"]}**', parse_mode=ParseMode.MARKDOWN, ) await message.reply_text( - "This group has left the federation {}!".format(fed_info["fed_name"]), + f'This group has left the federation {fed_info["fed_name"]}!' ) else: await message.reply_text( @@ -412,9 +389,7 @@ async def fed_chat(client, message): fed_owner = info["owner_id"] fed_admins = info["fadmins"] all_admins = [fed_owner] + fed_admins + [int(BOT_ID)] - if user.id in all_admins or user.id in SUDO: - pass - else: + if user.id not in all_admins and user.id not in SUDO: return await message.reply_text( "You need to be a Fed Admin to use this command" ) @@ -602,12 +577,10 @@ async def fban_user(client, message): if not fed_id: return await message.reply_text("**This chat is not a part of any federation.") info = await get_fed_info(fed_id) - fed_owner = info["owner_id"] fed_admins = info["fadmins"] + fed_owner = info["owner_id"] all_admins = [fed_owner] + fed_admins + [int(BOT_ID)] - if from_user.id in all_admins or from_user.id in SUDO: - pass - else: + if from_user.id not in all_admins and from_user.id not in SUDO: return await message.reply_text( "You need to be a Fed Admin to use this command" ) @@ -660,8 +633,7 @@ async def fban_user(client, message): try: await app.send_message( user.id, - f"Hello, You have been fed banned by {from_user.mention}," - + " You can appeal for this ban by talking to him.", + f"Hello, You have been fed banned by {from_user.mention}, You can appeal for this ban by talking to him.", ) except Exception: pass @@ -702,12 +674,10 @@ async def funban_user(client, message): if not fed_id: return await message.reply_text("**This chat is not a part of any federation.") info = await get_fed_info(fed_id) - fed_owner = info["owner_id"] fed_admins = info["fadmins"] + fed_owner = info["owner_id"] all_admins = [fed_owner] + fed_admins + [int(BOT_ID)] - if from_user.id in all_admins or from_user.id in SUDO: - pass - else: + if from_user.id not in all_admins and from_user.id not in SUDO: return await message.reply_text( "You need to be a Fed Admin to use this command" ) @@ -755,8 +725,7 @@ async def funban_user(client, message): try: await app.send_message( user.id, - f"Hello, You have been fed unbanned by {from_user.mention}," - + " You can thank him for his action.", + f"Hello, You have been fed unbanned by {from_user.mention}, You can thank him for his action.", ) except Exception: pass @@ -838,9 +807,7 @@ async def fbroadcast_message(client, message): fed_owner = info["owner_id"] fed_admins = info["fadmins"] all_admins = [fed_owner] + fed_admins + [int(BOT_ID)] - if from_user.id in all_admins or from_user.id in SUDO: - pass - else: + if from_user.id not in all_admins and from_user.id not in SUDO: return await message.reply_text( "You need to be a Fed Admin to use this command" ) @@ -891,12 +858,9 @@ async def del_fed_button(client, cb): getfed = await get_fed_info(fed_id) if getfed: - delete = fedsdb.delete_one({"fed_id": str(fed_id)}) - if delete: + if delete := fedsdb.delete_one({"fed_id": str(fed_id)}): await cb.message.edit_text( - "You have removed your Federation! Now all the Groups that are connected with `{}` do not have a Federation.".format( - getfed["fed_name"] - ), + f'You have removed your Federation! Now all the Groups that are connected with `{getfed["fed_name"]}` do not have a Federation.', parse_mode=ParseMode.MARKDOWN, ) diff --git a/misskaty/plugins/filters.py b/misskaty/plugins/filters.py index c249d110..4c74d66d 100644 --- a/misskaty/plugins/filters.py +++ b/misskaty/plugins/filters.py @@ -75,13 +75,12 @@ async def save_filters(_, message): data = text[1].strip() if replied_message.sticker or replied_message.video_note: data = None + elif replied_message.sticker or replied_message.video_note: + data = None + elif not replied_message.text and not replied_message.caption: + data = None else: - if replied_message.sticker or replied_message.video_note: - data = None - elif not replied_message.text and not replied_message.caption: - data = None - else: - data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown + data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown if replied_message.text: _type = "text" file_id = None @@ -109,9 +108,8 @@ async def save_filters(_, message): if replied_message.voice: _type = "voice" file_id = replied_message.voice.file_id - if replied_message.reply_markup and not "~" in data: - urls = extract_urls(replied_message.reply_markup) - if urls: + if replied_message.reply_markup and "~" not in data: + if urls := extract_urls(replied_message.reply_markup): response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls]) data = data + response name = name.replace("_", " ") @@ -175,11 +173,9 @@ async def filters_re(_, message): keyb = None if data: if re.findall(r"\[.+\,.+\]", data): - keyboard = extract_text_and_keyb(ikb, data) - if keyboard: + if keyboard := extract_text_and_keyb(ikb, data): data, keyb = keyboard - replied_message = message.reply_to_message - if replied_message: + if replied_message := message.reply_to_message: if text.startswith("~"): await message.delete() if replied_message.from_user.id != message.from_user.id: diff --git a/misskaty/plugins/fun.py b/misskaty/plugins/fun.py index d42c0a8c..79496b80 100644 --- a/misskaty/plugins/fun.py +++ b/misskaty/plugins/fun.py @@ -204,7 +204,7 @@ async def givereact(c, m): return await m.reply("Please add reaction after command, you can give multiple reaction too.") if not m.reply_to_message: return await m.reply("Please reply to the message you want to react to.") - emot = [emoji for emoji in regex.findall(r'\p{Emoji}', m.text)] + emot = list(regex.findall(r'\p{Emoji}', m.text)) try: await m.reply_to_message.react(emoji=emot) except ReactionInvalid: diff --git a/misskaty/plugins/imdb_search.py b/misskaty/plugins/imdb_search.py index e2bd4450..6d9c4158 100644 --- a/misskaty/plugins/imdb_search.py +++ b/misskaty/plugins/imdb_search.py @@ -93,12 +93,10 @@ async def imdblangset(_, query: CallbackQuery): InlineButton("🗑 Remove UserSetting", f"setimdb#rm#{query.from_user.id}") ) buttons.row(InlineButton("❌ Close", f"close#{query.from_user.id}")) - try: + with contextlib.suppress(MessageIdInvalid, MessageNotModified): await query.message.edit_caption( "Please select available language below..", reply_markup=buttons ) - except (MessageIdInvalid, MessageNotModified): - pass @app.on_cb("setimdb") @@ -109,7 +107,7 @@ async def imdbsetlang(_, query: CallbackQuery): _, langset = await is_imdbset(query.from_user.id) if langset == lang: return await query.answer(f"⚠️ Your Setting Already in ({langset})!", True) - try: + with contextlib.suppress(MessageIdInvalid, MessageNotModified): if lang == "eng": await add_imdbset(query.from_user.id, lang) await query.message.edit_caption( @@ -125,8 +123,6 @@ async def imdbsetlang(_, query: CallbackQuery): await query.message.edit_caption( "UserSetting for IMDB has been deleted from database." ) - except (MessageIdInvalid, MessageNotModified): - pass async def imdb_search_id(kueri, message): @@ -265,12 +261,10 @@ async def imdbcari(_, query: CallbackQuery): del LIST_CARI[msg] except KeyError: return await query.message.edit_caption("⚠️ Callback Query Sudah Expired!") - try: + with contextlib.suppress(MessageIdInvalid, MessageNotModified): await query.message.edit_caption( "🔎 Sedang mencari di Database IMDB.." ) - except (MessageIdInvalid, MessageNotModified): - pass msg = "" buttons = InlineKeyboard(row_width=4) with contextlib.redirect_stdout(sys.stderr): diff --git a/misskaty/plugins/misc_tools.py b/misskaty/plugins/misc_tools.py index 5226e42b..2297358a 100644 --- a/misskaty/plugins/misc_tools.py +++ b/misskaty/plugins/misc_tools.py @@ -84,25 +84,25 @@ def calcExpression(text): def calc_btn(uid): - CALCULATE_BUTTONS = InlineKeyboardMarkup( + return InlineKeyboardMarkup( [ [ InlineKeyboardButton("DEL", callback_data=f"calc|{uid}|DEL"), InlineKeyboardButton("AC", callback_data=f"calc|{uid}|AC"), InlineKeyboardButton("(", callback_data=f"calc|{uid}|("), - InlineKeyboardButton(")", callback_data=f"calc|{uid}|)") + InlineKeyboardButton(")", callback_data=f"calc|{uid}|)"), ], [ InlineKeyboardButton("7", callback_data=f"calc|{uid}|7"), InlineKeyboardButton("8", callback_data=f"calc|{uid}|8"), InlineKeyboardButton("9", callback_data=f"calc|{uid}|9"), - InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/") + InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/"), ], [ InlineKeyboardButton("4", callback_data=f"calc|{uid}|4"), InlineKeyboardButton("5", callback_data=f"calc|{uid}|5"), InlineKeyboardButton("6", callback_data=f"calc|{uid}|6"), - InlineKeyboardButton("×", callback_data=f"calc|{uid}|*") + InlineKeyboardButton("×", callback_data=f"calc|{uid}|*"), ], [ InlineKeyboardButton("1", callback_data=f"calc|{uid}|1"), @@ -115,10 +115,9 @@ def calc_btn(uid): InlineKeyboardButton("0", callback_data=f"calc|{uid}|0"), InlineKeyboardButton("=", callback_data=f"calc|{uid}|="), InlineKeyboardButton("+", callback_data=f"calc|{uid}|+"), - ] + ], ] ) - return CALCULATE_BUTTONS @app.on_message(filters.command(["calc", "calculate", "calculator"])) @@ -134,44 +133,43 @@ async def calculate_handler(self, ctx): @app.on_callback_query(filters.regex("^calc")) async def calc_cb(self, query): - _, uid, data = query.data.split("|") - if query.from_user.id != int(uid): - return await query.answer("Who are you??", show_alert=True, cache_time=5) - try: - text = query.message.text.split("\n")[0].strip().split("=")[0].strip() - text = '' if f"Made by @{self.me.username}" in text else text - inpt = text + query.data - result = "" - if data == "=": - result = calcExpression(text) - text = "" - elif data == "DEL": - text = text[:-1] - elif data == "AC": - text = "" - else: - dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt) - opcheck = re.findall(r"([*/\+-]{2,})", inpt) - if not dot_dot_check and not opcheck: - strOperands = re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt) - if strOperands: - text += data - result = calcExpression(text) + _, uid, data = query.data.split("|") + if query.from_user.id != int(uid): + return await query.answer("Who are you??", show_alert=True, cache_time=5) + try: + text = query.message.text.split("\n")[0].strip().split("=")[0].strip() + text = '' if f"Made by @{self.me.username}" in text else text + inpt = text + query.data + result = "" + if data == "=": + result = calcExpression(text) + text = "" + elif data == "DEL": + text = text[:-1] + elif data == "AC": + text = "" + else: + dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt) + opcheck = re.findall(r"([*/\+-]{2,})", inpt) + if not dot_dot_check and not opcheck: + if strOperands := re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt): + text += data + result = calcExpression(text) - text = f"{text:<50}" - if result: - if text: - text += f"\n{result:>50}" - else: - text = result - text += f"\n\nMade by @{self.me.username}" - await query.message.edit_msg( - text=text, - disable_web_page_preview=True, - reply_markup=calc_btn(query.from_user.id) - ) - except Exception as error: - LOGGER.error(error) + text = f"{text:<50}" + if result: + if text: + text += f"\n{result:>50}" + else: + text = result + text += f"\n\nMade by @{self.me.username}" + await query.message.edit_msg( + text=text, + disable_web_page_preview=True, + reply_markup=calc_btn(query.from_user.id) + ) + except Exception as error: + LOGGER.error(error) @app.on_cmd("kbbi") async def kbbi_search(_, ctx: Client): @@ -494,14 +492,12 @@ async def who_is(client, message): message_out_str += f"✅ Verified: {from_user.is_verified}\n" message_out_str += f"🌐 Profile Link: Click Here\n" if message.chat.type.value in (("supergroup", "channel")): - try: + with contextlib.suppress(UserNotParticipant, ChatAdminRequired): chat_member_p = await message.chat.get_member(from_user.id) joined_date = chat_member_p.joined_date message_out_str += ( "➲Joined this Chat on: " f"{joined_date}" "\n" ) - except (UserNotParticipant, ChatAdminRequired): - pass if chat_photo := from_user.photo: local_user_photo = await client.download_media(message=chat_photo.big_file_id) buttons = [ diff --git a/misskaty/plugins/notes.py b/misskaty/plugins/notes.py index 0be1fe0d..a6ee3d88 100644 --- a/misskaty/plugins/notes.py +++ b/misskaty/plugins/notes.py @@ -69,13 +69,12 @@ async def save_notee(_, message): data = text[1].strip() if replied_message.sticker or replied_message.video_note: data = None + elif replied_message.sticker or replied_message.video_note: + data = None + elif not replied_message.text and not replied_message.caption: + data = None else: - if replied_message.sticker or replied_message.video_note: - data = None - elif not replied_message.text and not replied_message.caption: - data = None - else: - data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown + data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown if replied_message.text: _type = "text" file_id = None @@ -103,9 +102,8 @@ async def save_notee(_, message): if replied_message.voice: _type = "voice" file_id = replied_message.voice.file_id - if replied_message.reply_markup and not "~" in data: - urls = extract_urls(replied_message.reply_markup) - if urls: + if replied_message.reply_markup and "~" not in data: + if urls := extract_urls(replied_message.reply_markup): response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls]) data = data + response note = { @@ -149,13 +147,11 @@ async def get_one_note(self, message): data = _note.get("data") file_id = _note.get("file_id") keyb = None - if data: + if data: if findall(r"\[.+\,.+\]", data): - keyboard = extract_text_and_keyb(ikb, data) - if keyboard: + if keyboard := extract_text_and_keyb(ikb, data): data, keyb = keyboard - replied_message = message.reply_to_message - if replied_message: + if replied_message := message.reply_to_message: if replied_message.from_user.id != message.from_user.id: message = replied_message if type_ == "text": @@ -235,15 +231,14 @@ async def delete_all(_, message): _notes = await get_note_names(message.chat.id) if not _notes: return await message.reply_text("**No notes in this chat.**") - else: - keyboard = InlineKeyboardMarkup( - [ - [InlineKeyboardButton("YES, DO IT", callback_data="delete_yes"), - InlineKeyboardButton("Cancel", callback_data="delete_no") - ] + keyboard = InlineKeyboardMarkup( + [ + [InlineKeyboardButton("YES, DO IT", callback_data="delete_yes"), + InlineKeyboardButton("Cancel", callback_data="delete_no") ] - ) - await message.reply_text("**Are you sure you want to delete all the notes in this chat forever ?.**", reply_markup=keyboard) + ] + ) + await message.reply_text("**Are you sure you want to delete all the notes in this chat forever ?.**", reply_markup=keyboard) @app.on_callback_query(filters.regex("delete_(.*)")) diff --git a/misskaty/plugins/quotly.py b/misskaty/plugins/quotly.py index a672f790..ffdbe896 100644 --- a/misskaty/plugins/quotly.py +++ b/misskaty/plugins/quotly.py @@ -251,9 +251,7 @@ def isArgInt(txt) -> list: @app.on_message(filters.command(["q", "qr"]) & filters.reply) async def msg_quotly_cmd(self: Client, ctx: Message): - is_reply = False - if ctx.command[0].endswith("r"): - is_reply = True + is_reply = bool(ctx.command[0].endswith("r")) if len(ctx.text.split()) > 1: check_arg = isArgInt(ctx.command[1]) if check_arg[0]: diff --git a/misskaty/plugins/web_scraper.py b/misskaty/plugins/web_scraper.py index 37f76a71..43ac137c 100644 --- a/misskaty/plugins/web_scraper.py +++ b/misskaty/plugins/web_scraper.py @@ -1286,8 +1286,7 @@ async def kusonime_scrap(client, callback_query, strings): InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) try: - init_url = data_kuso.get(link, False) - if init_url: + if init_url := data_kuso.get(link, False): await callback_query.message.edit_msg(init_url.get("ph_url"), reply_markup=keyboard) tgh = await kuso.telegraph(link, client.me.username) data_kuso[link] = {"ph_url": tgh} @@ -1369,9 +1368,7 @@ async def nodrakorddl_scrap(_, callback_query, strings): soup = BeautifulSoup(html.text, "lxml") if "/tv/" in link: result = soup.find("div", {"entry-content entry-content-single"}).find_all("p") - msg = "" - for i in result: - msg += str(f"{i}\n") + msg = "".join(str(f"{i}\n") for i in result) link = await post_to_telegraph(False, "MissKaty NoDrakor", msg) return await callback_query.message.edit_msg( strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard diff --git a/misskaty/vars.py b/misskaty/vars.py index a8180b09..f66b8116 100644 --- a/misskaty/vars.py +++ b/misskaty/vars.py @@ -12,13 +12,11 @@ LOGGER = getLogger("MissKaty") dotenv.load_dotenv("config.env", override=True) -# Required ENV -API_ID = environ.get("API_ID", "") -if not API_ID: +if API_ID := environ.get("API_ID", ""): + API_ID = int(API_ID) +else: LOGGER.error("API_ID variable is missing! Exiting now") sys.exit(1) -else: - API_ID = int(API_ID) API_HASH = environ.get("API_HASH", "") if not API_HASH: LOGGER.error("API_HASH variable is missing! Exiting now") @@ -31,13 +29,12 @@ DATABASE_URI = environ.get("DATABASE_URI", "") if not DATABASE_URI: LOGGER.error("DATABASE_URI variable is missing! Exiting now") sys.exit(1) -LOG_CHANNEL = environ.get("LOG_CHANNEL", "") -if not LOG_CHANNEL: - LOGGER.error("LOG_CHANNEL variable is missing! Exiting now") - sys.exit(1) -else: +if LOG_CHANNEL := environ.get("LOG_CHANNEL", ""): LOG_CHANNEL = int(LOG_CHANNEL) +else: + LOGGER.error("LOG_CHANNEL variable is missing! Exiting now") + sys.exit(1) # Optional ENV LOG_GROUP_ID = environ.get("LOG_GROUP_ID") USER_SESSION = environ.get("USER_SESSION") diff --git a/update.py b/update.py index 99b9c88c..0145e797 100644 --- a/update.py +++ b/update.py @@ -61,7 +61,6 @@ if all([UPSTREAM_REPO_URL, UPSTREAM_REPO_BRANCH]): LOGGER.info(f"Successfully update with latest branch > {UPSTREAM_REPO_BRANCH}") except Exception as e: LOGGER.error(e) - pass else: LOGGER.warning( "UPSTREAM_REPO_URL or UPSTREAM_REPO_BRANCH is not defined, Skipping auto update"