diff --git a/database/blacklist_db.py b/database/blacklist_db.py
index d05ec9cc..c7524aee 100644
--- a/database/blacklist_db.py
+++ b/database/blacklist_db.py
@@ -7,9 +7,7 @@ blacklist_filtersdb = dbname["blacklistFilters"]
async def get_blacklisted_words(chat_id: int) -> List[str]:
_filters = await blacklist_filtersdb.find_one({"chat_id": chat_id})
- if not _filters:
- return []
- return _filters["filters"]
+ return [] if not _filters else _filters["filters"]
async def save_blacklist_filter(chat_id: int, word: str):
diff --git a/database/feds_db.py b/database/feds_db.py
index 7a26aef3..f5a19649 100644
--- a/database/feds_db.py
+++ b/database/feds_db.py
@@ -9,9 +9,7 @@ fedsdb = dbname["federation"]
def get_fed_info(fed_id):
get = fedsdb.find_one({"fed_id": str(fed_id)})
- if get is None:
- return False
- return get
+ return False if get is None else get
async def get_fed_id(chat_id):
@@ -19,12 +17,14 @@ async def get_fed_id(chat_id):
if get is None:
return False
- else:
- for chat_info in get.get("chat_ids", []):
- if chat_info["chat_id"] == int(chat_id):
- return get["fed_id"]
-
- return False
+ return next(
+ (
+ get["fed_id"]
+ for chat_info in get.get("chat_ids", [])
+ if chat_info["chat_id"] == int(chat_id)
+ ),
+ False,
+ )
async def get_feds_by_owner(owner_id):
@@ -32,10 +32,9 @@ async def get_feds_by_owner(owner_id):
feds = await cursor.to_list(length=None)
if not feds:
return False
- federations = [
+ return [
{"fed_id": fed["fed_id"], "fed_name": fed["fed_name"]} for fed in feds
]
- return federations
async def transfer_owner(fed_id, current_owner_id, new_owner_id):
@@ -58,10 +57,7 @@ async def set_log_chat(fed_id, log_group_id: int):
async def get_fed_name(chat_id):
get = await fedsdb.find_one(int(chat_id))
- if get is None:
- return False
- else:
- return get["fed_name"]
+ return False if get is None else get["fed_name"]
async def is_user_fed_owner(fed_id, user_id: int):
@@ -69,19 +65,13 @@ async def is_user_fed_owner(fed_id, user_id: int):
if not getfed:
return False
owner_id = getfed["owner_id"]
- if user_id == owner_id or user_id not in SUDO:
- return True
- else:
- return False
+ return user_id == owner_id or user_id not in SUDO
async def search_fed_by_id(fed_id):
get = await fedsdb.find_one({"fed_id": str(fed_id)})
- if get is not None:
- return get
- else:
- return False
+ return get if get is not None else False
def chat_join_fed(fed_id, chat_name, chat_id):
@@ -96,41 +86,26 @@ async def chat_leave_fed(chat_id):
{"chat_ids.chat_id": int(chat_id)},
{"$pull": {"chat_ids": {"chat_id": int(chat_id)}}},
)
- if result.modified_count > 0:
- return True
- else:
- return False
+ return result.modified_count > 0
async def user_join_fed(fed_id, user_id):
result = await fedsdb.update_one(
{"fed_id": fed_id}, {"$addToSet": {"fadmins": int(user_id)}}, upsert=True
)
- if result.modified_count > 0:
- return True
- else:
- return False
+ return result.modified_count > 0
async def user_demote_fed(fed_id, user_id):
result = await fedsdb.update_one(
{"fed_id": fed_id}, {"$pull": {"fadmins": int(user_id)}}
)
- if result.modified_count > 0:
- return True
- else:
- return False
+ return result.modified_count > 0
async def search_user_in_fed(fed_id, user_id):
getfed = await search_fed_by_id(fed_id)
- if getfed is None:
- return False
- fadmins = getfed["fadmins"]
- if user_id in fadmins:
- return True
- else:
- return False
+ return False if getfed is None else user_id in getfed["fadmins"]
async def chat_id_and_names_in_fed(fed_id):
diff --git a/database/users_chats_db.py b/database/users_chats_db.py
index a636e1ef..50eefa82 100644
--- a/database/users_chats_db.py
+++ b/database/users_chats_db.py
@@ -51,9 +51,7 @@ class UsersData:
async def get_ban_status(self, id):
user = await self.col.find_one({"_id": int(id)})
- if user:
- return True, user
- return False, None
+ return (True, user) if user else (False, None)
async def get_all_users(self):
return self.col.find({})
diff --git a/misskaty/core/misskaty_patch/bound/message.py b/misskaty/core/misskaty_patch/bound/message.py
index 18d487ab..935206a1 100644
--- a/misskaty/core/misskaty_patch/bound/message.py
+++ b/misskaty/core/misskaty_patch/bound/message.py
@@ -24,9 +24,7 @@ LOGGER = getLogger("MissKaty")
@property
def parse_cmd(msg):
- if len(msg.command) > 1:
- return msg.text.split(None, 1)[1]
- return None
+ return msg.text.split(None, 1)[1] if len(msg.command) > 1 else None
async def reply_text(
diff --git a/misskaty/helper/functions.py b/misskaty/helper/functions.py
index b8923e51..84ce56af 100644
--- a/misskaty/helper/functions.py
+++ b/misskaty/helper/functions.py
@@ -24,7 +24,11 @@ def extract_urls(reply_markup):
for i, row in enumerate(buttons):
for j, button in enumerate(row):
if button.url:
- name = "\n~\nbutton" if i * len(row) + j + 1 == 1 else f"button{i * len(row) + j + 1}"
+ name = (
+ "\n~\nbutton"
+ if i * len(row) + j == 0
+ else f"button{i * len(row) + j + 1}"
+ )
urls.append((f"{name}", button.text, button.url))
return urls
diff --git a/misskaty/helper/kuso_utils.py b/misskaty/helper/kuso_utils.py
index 1c9fc9f8..91daa1af 100644
--- a/misskaty/helper/kuso_utils.py
+++ b/misskaty/helper/kuso_utils.py
@@ -18,7 +18,7 @@ async def kusonimeBypass(url: str):
result = {}
page = await fetch.get(url)
if page.status_code != 200:
- raise Exception(f"ERROR: Hostname might be blocked by server!")
+ raise Exception("ERROR: Hostname might be blocked by server!")
try:
soup = BeautifulSoup(page.text, "lxml")
thumb = soup.find("div", {"class": "post-thumb"}).find("img").get("src")
@@ -96,7 +96,7 @@ async def kusonimeBypass(url: str):
num += 1
result.update({"title": title, "thumb": thumb, "genre": genre, "genre_string": ", ".join(genre), "status_anime": status_anime, "season": season, "tipe": tipe, "ep": ep, "score": score, "duration": duration, "rilis": rilis, "data": data})
except Exception as e:
- if len(result) != 0:
+ if result:
result.clear()
err = traceback.format_exc()
LOGGER.error(f"class: {e.__class__.__name_}, {err}")
@@ -133,10 +133,11 @@ async def byPassPh(url: str, name: str) -> Optional[str]:
{{/data}}
""".strip()
- plink = await post_to_telegraph(
- False, f"{kusonime.get('title')} By {escape(name)}", chevron.render(template, kusonime)
+ return await post_to_telegraph(
+ False,
+ f"{kusonime.get('title')} By {escape(name)}",
+ chevron.render(template, kusonime),
)
- return plink
class Kusonime:
diff --git a/misskaty/helper/misc.py b/misskaty/helper/misc.py
index 0efe464a..dc3a4f56 100644
--- a/misskaty/helper/misc.py
+++ b/misskaty/helper/misc.py
@@ -76,11 +76,12 @@ def paginate_modules(page_n, module_dict, prefix, chat=None):
)
]
else:
- pairs = pairs[modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1)] + [
- (
+ pairs = pairs[
+ modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1)
+ ] + [
+ (
EqInlineKeyboardButton(
- "Back",
- callback_data="{}_home({})".format(prefix, modulo_page),
+ "Back", callback_data=f"{prefix}_home({modulo_page})"
),
)
]
diff --git a/misskaty/helper/sqlite_helper.py b/misskaty/helper/sqlite_helper.py
index 3285a625..5b7fe3b7 100644
--- a/misskaty/helper/sqlite_helper.py
+++ b/misskaty/helper/sqlite_helper.py
@@ -173,9 +173,7 @@ class Cache:
@staticmethod
def _exp_datetime(exp: float) -> Optional[datetime]:
- if exp == -1.0:
- return None
- return datetime.utcfromtimestamp(exp)
+ return None if exp == -1.0 else datetime.utcfromtimestamp(exp)
def _stream(self, value: Any) -> bytes:
return pickle.dumps(value, protocol=self.PICKLE_PROTOCOL)
@@ -407,10 +405,7 @@ class Cache:
def get_all(self) -> Dict[str, Any]:
"""Get all key-value pairs from the cache."""
all_data = self._con.execute("SELECT key, value FROM cache;").fetchall()
- results = {}
- for key, value in all_data:
- results[key] = self._unstream(value)
- return results
+ return {key: self._unstream(value) for key, value in all_data}
def clear(self) -> None:
"""Clear the cache from all values."""
diff --git a/misskaty/helper/tools.py b/misskaty/helper/tools.py
index 75932999..70323f3b 100644
--- a/misskaty/helper/tools.py
+++ b/misskaty/helper/tools.py
@@ -161,18 +161,10 @@ def isValidURL(str):
"{2,256}\\.[a-z]" +
"{2,6}\\b([-a-zA-Z0-9@:%" +
"._\\+~#?&//=]*)")
-
+
# Compile the ReGex
p = re.compile(regex)
-
+
# If the string is empty
# return false
- if (str == None):
- return False
-
- # Return if the string
- # matched the ReGex
- if(re.search(p, str)):
- return True
- else:
- return False
\ No newline at end of file
+ return False if str is None else bool((re.search(p, str)))
\ No newline at end of file
diff --git a/misskaty/plugins/admin.py b/misskaty/plugins/admin.py
index 780216f7..834b3762 100644
--- a/misskaty/plugins/admin.py
+++ b/misskaty/plugins/admin.py
@@ -863,7 +863,6 @@ async def set_chat_photo(_, ctx: Message):
@app.on_message(filters.group & filters.command('mentionall', COMMAND_HANDLER))
async def mentionall(app: Client, msg: Message):
- NUM = 4
user = await msg.chat.get_member(msg.from_user.id)
if user.status in (enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR):
total = []
@@ -874,6 +873,7 @@ async def mentionall(app: Client, msg: Message):
else:
total.append(member.user.mention())
+ NUM = 4
for i in range(0, len(total), NUM):
message = ' '.join(total[i:i+NUM])
await app.send_message(msg.chat.id, message, message_thread_id=msg.message_thread_id)
diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py
index 8469436b..c4628b50 100644
--- a/misskaty/plugins/dev.py
+++ b/misskaty/plugins/dev.py
@@ -126,10 +126,8 @@ async def log_file(_, ctx: Message, strings):
@app.on_message(filters.command(["donate"], COMMAND_HANDLER))
async def donate(self: Client, ctx: Message):
- try:
+ with contextlib.suppress(ReactionInvalid):
await ctx.react(emoji="❤️")
- except ReactionInvalid:
- pass
try:
await ctx.reply_photo(
"https://img.yasirweb.eu.org/file/9427d61d6968b8ee4fb2f.jpg",
diff --git a/misskaty/plugins/download_upload.py b/misskaty/plugins/download_upload.py
index c1de743f..566109c8 100644
--- a/misskaty/plugins/download_upload.py
+++ b/misskaty/plugins/download_upload.py
@@ -242,7 +242,12 @@ async def twitterdl(_, message):
"ts": "1691416179",
"source": "form",
}
- post = await fetch.post(f"https://ssstwitter.com/id", data=data, headers=headers, follow_redirects=True)
+ post = await fetch.post(
+ "https://ssstwitter.com/id",
+ data=data,
+ headers=headers,
+ follow_redirects=True,
+ )
if post.status_code not in [200, 401]:
return await msg.edit_msg("Unknown error.")
soup = BeautifulSoup(post.text, "lxml")
@@ -275,7 +280,9 @@ async def tiktokdl(_, message):
msg = await message.reply("Trying download...")
try:
r = (
- await fetch.post(f"https://lovetik.com/api/ajax/search", data={"query": link})
+ await fetch.post(
+ "https://lovetik.com/api/ajax/search", data={"query": link}
+ )
).json()
fname = (await fetch.head(r["links"][0]["a"])).headers.get("content-disposition", "")
filename = unquote(fname.split('filename=')[1].strip('"').split('"')[0])
diff --git a/misskaty/plugins/federation.py b/misskaty/plugins/federation.py
index e1c918ec..681ac5b5 100644
--- a/misskaty/plugins/federation.py
+++ b/misskaty/plugins/federation.py
@@ -61,11 +61,11 @@ async def new_fed(self, message):
if len(message.command) < 2:
return await message.reply_msg("Please write the name of the federation!")
fednam = message.text.split(None, 1)[1]
- if not fednam == "":
+ if fednam != "":
fed_id = str(uuid.uuid4())
fed_name = fednam
x = await fedsdb.update_one(
- {"fed_id": str(fed_id)},
+ {"fed_id": fed_id},
{
"$set": {
"fed_name": str(fed_name),
@@ -85,17 +85,13 @@ async def new_fed(self, message):
)
await message.reply_msg(
- "**You have succeeded in creating a new federation!**"
- "\nName: `{}`"
- "\nID: `{}`"
- "\n\nUse the command below to join the federation:"
- "\n`/joinfed {}`".format(fed_name, fed_id, fed_id),
+ f"**You have succeeded in creating a new federation!**\nName: `{fed_name}`\nID: `{fed_id}`\n\nUse the command below to join the federation:\n`/joinfed {fed_id}`",
parse_mode=ParseMode.MARKDOWN,
)
try:
await app.send_message(
LOG_GROUP_ID,
- "New Federation: {}\nID:
{}".format(fed_name, fed_id),
+ f"New Federation: {fed_name}\nID: {fed_id}",
parse_mode=ParseMode.HTML,
)
except:
@@ -114,24 +110,21 @@ async def del_fed(client, message):
"Federations can only be deleted by privately messaging me."
)
args = message.text.split(" ", 1)
- if len(args) > 1:
- is_fed_id = args[1].strip()
- getinfo = await get_fed_info(is_fed_id)
- if getinfo is False:
- return await message.reply_text("This federation does not exist.")
- if getinfo["owner_id"] == user.id or user.id not in SUDO:
- fed_id = is_fed_id
- else:
- return await message.reply_text("Only federation owners can do this!")
- else:
+ if len(args) <= 1:
return await message.reply_text("What should I delete?")
+ is_fed_id = args[1].strip()
+ getinfo = await get_fed_info(is_fed_id)
+ if getinfo is False:
+ return await message.reply_text("This federation does not exist.")
+ if getinfo["owner_id"] == user.id or user.id not in SUDO:
+ fed_id = is_fed_id
+ else:
+ return await message.reply_text("Only federation owners can do this!")
is_owner = await is_user_fed_owner(fed_id, user.id)
if is_owner is False:
return await message.reply_text("Only federation owners can do this!")
await message.reply_text(
- "You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{}' will be permanently lost.".format(
- getinfo["fed_name"]
- ),
+ f"""You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{getinfo["fed_name"]}' will be permanently lost.""",
reply_markup=InlineKeyboardMarkup(
[
[
@@ -245,10 +238,10 @@ async def fed_log(client, message):
"Send this command on the chat which you need to set as fed log channel."
)
member = await app.get_chat_member(chat.id, user.id)
- if (
- member.status == ChatMemberStatus.OWNER
- or member.status == ChatMemberStatus.ADMINISTRATOR
- ):
+ if member.status in [
+ ChatMemberStatus.OWNER,
+ ChatMemberStatus.ADMINISTRATOR,
+ ]:
if len(message.command) < 2:
return await message.reply_text(
"Please provide the Id of the federation with the command!"
@@ -258,10 +251,7 @@ async def fed_log(client, message):
if info is False:
return await message.reply_text("This federation does not exist.")
if await is_user_fed_owner(fed_id, user.id):
- if "/unsetfedlog" in message.text:
- log_group_id = LOG_GROUP_ID
- else:
- log_group_id = chat.id
+ log_group_id = LOG_GROUP_ID if "/unsetfedlog" in message.text else chat.id
loged = await set_log_chat(fed_id, log_group_id)
if "/unsetfedlog" in message.text:
return await message.reply_text("log channel removed successfully.")
@@ -281,21 +271,17 @@ async def fed_chat(self, message):
fed_id = await get_fed_id(chat.id)
member = await self.get_chat_member(chat.id, user.id)
- if (
- member.status == ChatMemberStatus.OWNER
- or member.status == ChatMemberStatus.ADMINISTRATOR
- ):
- pass
- else:
+ if member.status not in [
+ ChatMemberStatus.OWNER,
+ ChatMemberStatus.ADMINISTRATOR,
+ ]:
return await message.reply_text("You must be an admin to execute this command")
if not fed_id:
return await message.reply_text("This group is not in any federation!")
info = await get_fed_info(fed_id)
- text = "This group is part of the following federation:"
- text += "\n{} (ID: {})".format(info["fed_name"], fed_id)
-
+ text = f'This group is part of the following federation:\n{info["fed_name"]} (ID: {fed_id})'
await message.reply_text(text, parse_mode=ParseMode.HTML)
@@ -312,13 +298,10 @@ async def join_fed(self, message):
member = await self.get_chat_member(chat.id, user.id)
fed_id = await get_fed_id(int(chat.id))
- if user.id in SUDO:
- pass
- else:
- if member.status == ChatMemberStatus.OWNER:
- pass
- else:
- return await message.reply_text("Only group creators can use this command!")
+ if (
+ user.id in SUDO or member.status != ChatMemberStatus.OWNER
+ ) and user.id not in SUDO:
+ return await message.reply_text("Only group creators can use this command!")
if fed_id:
return await message.reply_text("You cannot join two federations from one chat")
args = message.text.split(" ", 1)
@@ -334,18 +317,15 @@ async def join_fed(self, message):
f"Failed to join federation! Please contact {SUPPORT_CHAT} if this problem persists!"
)
- get_fedlog = getfed["log_group_id"]
- if get_fedlog:
+ if get_fedlog := getfed["log_group_id"]:
await app.send_message(
get_fedlog,
- "Chat **{}** has joined the federation **{}**".format(
- chat.title, getfed["fed_name"]
- ),
+ f'Chat **{chat.title}** has joined the federation **{getfed["fed_name"]}**',
parse_mode=ParseMode.MARKDOWN,
)
await message.reply_text(
- "This group has joined the federation: {}!".format(getfed["fed_name"])
+ f'This group has joined the federation: {getfed["fed_name"]}!'
)
else:
await message.reply_text(
@@ -370,17 +350,14 @@ async def leave_fed(client, message):
member = await app.get_chat_member(chat.id, user.id)
if member.status == ChatMemberStatus.OWNER or user.id in SUDO:
if await chat_leave_fed(int(chat.id)) is True:
- get_fedlog = fed_info["log_group_id"]
- if get_fedlog:
+ if get_fedlog := fed_info["log_group_id"]:
await app.send_message(
get_fedlog,
- "Chat **{}** has left the federation **{}**".format(
- chat.title, fed_info["fed_name"]
- ),
+ f'Chat **{chat.title}** has left the federation **{fed_info["fed_name"]}**',
parse_mode=ParseMode.MARKDOWN,
)
await message.reply_text(
- "This group has left the federation {}!".format(fed_info["fed_name"]),
+ f'This group has left the federation {fed_info["fed_name"]}!'
)
else:
await message.reply_text(
@@ -412,9 +389,7 @@ async def fed_chat(client, message):
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
- if user.id in all_admins or user.id in SUDO:
- pass
- else:
+ if user.id not in all_admins and user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@@ -602,12 +577,10 @@ async def fban_user(client, message):
if not fed_id:
return await message.reply_text("**This chat is not a part of any federation.")
info = await get_fed_info(fed_id)
- fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
+ fed_owner = info["owner_id"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
- if from_user.id in all_admins or from_user.id in SUDO:
- pass
- else:
+ if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@@ -660,8 +633,7 @@ async def fban_user(client, message):
try:
await app.send_message(
user.id,
- f"Hello, You have been fed banned by {from_user.mention},"
- + " You can appeal for this ban by talking to him.",
+ f"Hello, You have been fed banned by {from_user.mention}, You can appeal for this ban by talking to him.",
)
except Exception:
pass
@@ -702,12 +674,10 @@ async def funban_user(client, message):
if not fed_id:
return await message.reply_text("**This chat is not a part of any federation.")
info = await get_fed_info(fed_id)
- fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
+ fed_owner = info["owner_id"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
- if from_user.id in all_admins or from_user.id in SUDO:
- pass
- else:
+ if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@@ -755,8 +725,7 @@ async def funban_user(client, message):
try:
await app.send_message(
user.id,
- f"Hello, You have been fed unbanned by {from_user.mention},"
- + " You can thank him for his action.",
+ f"Hello, You have been fed unbanned by {from_user.mention}, You can thank him for his action.",
)
except Exception:
pass
@@ -838,9 +807,7 @@ async def fbroadcast_message(client, message):
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
- if from_user.id in all_admins or from_user.id in SUDO:
- pass
- else:
+ if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@@ -891,12 +858,9 @@ async def del_fed_button(client, cb):
getfed = await get_fed_info(fed_id)
if getfed:
- delete = fedsdb.delete_one({"fed_id": str(fed_id)})
- if delete:
+ if delete := fedsdb.delete_one({"fed_id": str(fed_id)}):
await cb.message.edit_text(
- "You have removed your Federation! Now all the Groups that are connected with `{}` do not have a Federation.".format(
- getfed["fed_name"]
- ),
+ f'You have removed your Federation! Now all the Groups that are connected with `{getfed["fed_name"]}` do not have a Federation.',
parse_mode=ParseMode.MARKDOWN,
)
diff --git a/misskaty/plugins/filters.py b/misskaty/plugins/filters.py
index c249d110..4c74d66d 100644
--- a/misskaty/plugins/filters.py
+++ b/misskaty/plugins/filters.py
@@ -75,13 +75,12 @@ async def save_filters(_, message):
data = text[1].strip()
if replied_message.sticker or replied_message.video_note:
data = None
+ elif replied_message.sticker or replied_message.video_note:
+ data = None
+ elif not replied_message.text and not replied_message.caption:
+ data = None
else:
- if replied_message.sticker or replied_message.video_note:
- data = None
- elif not replied_message.text and not replied_message.caption:
- data = None
- else:
- data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown
+ data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown
if replied_message.text:
_type = "text"
file_id = None
@@ -109,9 +108,8 @@ async def save_filters(_, message):
if replied_message.voice:
_type = "voice"
file_id = replied_message.voice.file_id
- if replied_message.reply_markup and not "~" in data:
- urls = extract_urls(replied_message.reply_markup)
- if urls:
+ if replied_message.reply_markup and "~" not in data:
+ if urls := extract_urls(replied_message.reply_markup):
response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls])
data = data + response
name = name.replace("_", " ")
@@ -175,11 +173,9 @@ async def filters_re(_, message):
keyb = None
if data:
if re.findall(r"\[.+\,.+\]", data):
- keyboard = extract_text_and_keyb(ikb, data)
- if keyboard:
+ if keyboard := extract_text_and_keyb(ikb, data):
data, keyb = keyboard
- replied_message = message.reply_to_message
- if replied_message:
+ if replied_message := message.reply_to_message:
if text.startswith("~"):
await message.delete()
if replied_message.from_user.id != message.from_user.id:
diff --git a/misskaty/plugins/fun.py b/misskaty/plugins/fun.py
index d42c0a8c..79496b80 100644
--- a/misskaty/plugins/fun.py
+++ b/misskaty/plugins/fun.py
@@ -204,7 +204,7 @@ async def givereact(c, m):
return await m.reply("Please add reaction after command, you can give multiple reaction too.")
if not m.reply_to_message:
return await m.reply("Please reply to the message you want to react to.")
- emot = [emoji for emoji in regex.findall(r'\p{Emoji}', m.text)]
+ emot = list(regex.findall(r'\p{Emoji}', m.text))
try:
await m.reply_to_message.react(emoji=emot)
except ReactionInvalid:
diff --git a/misskaty/plugins/imdb_search.py b/misskaty/plugins/imdb_search.py
index e2bd4450..6d9c4158 100644
--- a/misskaty/plugins/imdb_search.py
+++ b/misskaty/plugins/imdb_search.py
@@ -93,12 +93,10 @@ async def imdblangset(_, query: CallbackQuery):
InlineButton("🗑 Remove UserSetting", f"setimdb#rm#{query.from_user.id}")
)
buttons.row(InlineButton("❌ Close", f"close#{query.from_user.id}"))
- try:
+ with contextlib.suppress(MessageIdInvalid, MessageNotModified):
await query.message.edit_caption(
"Please select available language below..", reply_markup=buttons
)
- except (MessageIdInvalid, MessageNotModified):
- pass
@app.on_cb("setimdb")
@@ -109,7 +107,7 @@ async def imdbsetlang(_, query: CallbackQuery):
_, langset = await is_imdbset(query.from_user.id)
if langset == lang:
return await query.answer(f"⚠️ Your Setting Already in ({langset})!", True)
- try:
+ with contextlib.suppress(MessageIdInvalid, MessageNotModified):
if lang == "eng":
await add_imdbset(query.from_user.id, lang)
await query.message.edit_caption(
@@ -125,8 +123,6 @@ async def imdbsetlang(_, query: CallbackQuery):
await query.message.edit_caption(
"UserSetting for IMDB has been deleted from database."
)
- except (MessageIdInvalid, MessageNotModified):
- pass
async def imdb_search_id(kueri, message):
@@ -265,12 +261,10 @@ async def imdbcari(_, query: CallbackQuery):
del LIST_CARI[msg]
except KeyError:
return await query.message.edit_caption("⚠️ Callback Query Sudah Expired!")
- try:
+ with contextlib.suppress(MessageIdInvalid, MessageNotModified):
await query.message.edit_caption(
"🔎 Sedang mencari di Database IMDB.."
)
- except (MessageIdInvalid, MessageNotModified):
- pass
msg = ""
buttons = InlineKeyboard(row_width=4)
with contextlib.redirect_stdout(sys.stderr):
diff --git a/misskaty/plugins/misc_tools.py b/misskaty/plugins/misc_tools.py
index 5226e42b..2297358a 100644
--- a/misskaty/plugins/misc_tools.py
+++ b/misskaty/plugins/misc_tools.py
@@ -84,25 +84,25 @@ def calcExpression(text):
def calc_btn(uid):
- CALCULATE_BUTTONS = InlineKeyboardMarkup(
+ return InlineKeyboardMarkup(
[
[
InlineKeyboardButton("DEL", callback_data=f"calc|{uid}|DEL"),
InlineKeyboardButton("AC", callback_data=f"calc|{uid}|AC"),
InlineKeyboardButton("(", callback_data=f"calc|{uid}|("),
- InlineKeyboardButton(")", callback_data=f"calc|{uid}|)")
+ InlineKeyboardButton(")", callback_data=f"calc|{uid}|)"),
],
[
InlineKeyboardButton("7", callback_data=f"calc|{uid}|7"),
InlineKeyboardButton("8", callback_data=f"calc|{uid}|8"),
InlineKeyboardButton("9", callback_data=f"calc|{uid}|9"),
- InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/")
+ InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/"),
],
[
InlineKeyboardButton("4", callback_data=f"calc|{uid}|4"),
InlineKeyboardButton("5", callback_data=f"calc|{uid}|5"),
InlineKeyboardButton("6", callback_data=f"calc|{uid}|6"),
- InlineKeyboardButton("×", callback_data=f"calc|{uid}|*")
+ InlineKeyboardButton("×", callback_data=f"calc|{uid}|*"),
],
[
InlineKeyboardButton("1", callback_data=f"calc|{uid}|1"),
@@ -115,10 +115,9 @@ def calc_btn(uid):
InlineKeyboardButton("0", callback_data=f"calc|{uid}|0"),
InlineKeyboardButton("=", callback_data=f"calc|{uid}|="),
InlineKeyboardButton("+", callback_data=f"calc|{uid}|+"),
- ]
+ ],
]
)
- return CALCULATE_BUTTONS
@app.on_message(filters.command(["calc", "calculate", "calculator"]))
@@ -134,44 +133,43 @@ async def calculate_handler(self, ctx):
@app.on_callback_query(filters.regex("^calc"))
async def calc_cb(self, query):
- _, uid, data = query.data.split("|")
- if query.from_user.id != int(uid):
- return await query.answer("Who are you??", show_alert=True, cache_time=5)
- try:
- text = query.message.text.split("\n")[0].strip().split("=")[0].strip()
- text = '' if f"Made by @{self.me.username}" in text else text
- inpt = text + query.data
- result = ""
- if data == "=":
- result = calcExpression(text)
- text = ""
- elif data == "DEL":
- text = text[:-1]
- elif data == "AC":
- text = ""
- else:
- dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt)
- opcheck = re.findall(r"([*/\+-]{2,})", inpt)
- if not dot_dot_check and not opcheck:
- strOperands = re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt)
- if strOperands:
- text += data
- result = calcExpression(text)
+ _, uid, data = query.data.split("|")
+ if query.from_user.id != int(uid):
+ return await query.answer("Who are you??", show_alert=True, cache_time=5)
+ try:
+ text = query.message.text.split("\n")[0].strip().split("=")[0].strip()
+ text = '' if f"Made by @{self.me.username}" in text else text
+ inpt = text + query.data
+ result = ""
+ if data == "=":
+ result = calcExpression(text)
+ text = ""
+ elif data == "DEL":
+ text = text[:-1]
+ elif data == "AC":
+ text = ""
+ else:
+ dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt)
+ opcheck = re.findall(r"([*/\+-]{2,})", inpt)
+ if not dot_dot_check and not opcheck:
+ if strOperands := re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt):
+ text += data
+ result = calcExpression(text)
- text = f"{text:<50}"
- if result:
- if text:
- text += f"\n{result:>50}"
- else:
- text = result
- text += f"\n\nMade by @{self.me.username}"
- await query.message.edit_msg(
- text=text,
- disable_web_page_preview=True,
- reply_markup=calc_btn(query.from_user.id)
- )
- except Exception as error:
- LOGGER.error(error)
+ text = f"{text:<50}"
+ if result:
+ if text:
+ text += f"\n{result:>50}"
+ else:
+ text = result
+ text += f"\n\nMade by @{self.me.username}"
+ await query.message.edit_msg(
+ text=text,
+ disable_web_page_preview=True,
+ reply_markup=calc_btn(query.from_user.id)
+ )
+ except Exception as error:
+ LOGGER.error(error)
@app.on_cmd("kbbi")
async def kbbi_search(_, ctx: Client):
@@ -494,14 +492,12 @@ async def who_is(client, message):
message_out_str += f"✅ Verified: {from_user.is_verified}\n"
message_out_str += f"🌐 Profile Link: Click Here\n"
if message.chat.type.value in (("supergroup", "channel")):
- try:
+ with contextlib.suppress(UserNotParticipant, ChatAdminRequired):
chat_member_p = await message.chat.get_member(from_user.id)
joined_date = chat_member_p.joined_date
message_out_str += (
"➲Joined this Chat on: " f"{joined_date}" "\n"
)
- except (UserNotParticipant, ChatAdminRequired):
- pass
if chat_photo := from_user.photo:
local_user_photo = await client.download_media(message=chat_photo.big_file_id)
buttons = [
diff --git a/misskaty/plugins/notes.py b/misskaty/plugins/notes.py
index 0be1fe0d..a6ee3d88 100644
--- a/misskaty/plugins/notes.py
+++ b/misskaty/plugins/notes.py
@@ -69,13 +69,12 @@ async def save_notee(_, message):
data = text[1].strip()
if replied_message.sticker or replied_message.video_note:
data = None
+ elif replied_message.sticker or replied_message.video_note:
+ data = None
+ elif not replied_message.text and not replied_message.caption:
+ data = None
else:
- if replied_message.sticker or replied_message.video_note:
- data = None
- elif not replied_message.text and not replied_message.caption:
- data = None
- else:
- data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown
+ data = replied_message.text.markdown if replied_message.text else replied_message.caption.markdown
if replied_message.text:
_type = "text"
file_id = None
@@ -103,9 +102,8 @@ async def save_notee(_, message):
if replied_message.voice:
_type = "voice"
file_id = replied_message.voice.file_id
- if replied_message.reply_markup and not "~" in data:
- urls = extract_urls(replied_message.reply_markup)
- if urls:
+ if replied_message.reply_markup and "~" not in data:
+ if urls := extract_urls(replied_message.reply_markup):
response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls])
data = data + response
note = {
@@ -149,13 +147,11 @@ async def get_one_note(self, message):
data = _note.get("data")
file_id = _note.get("file_id")
keyb = None
- if data:
+ if data:
if findall(r"\[.+\,.+\]", data):
- keyboard = extract_text_and_keyb(ikb, data)
- if keyboard:
+ if keyboard := extract_text_and_keyb(ikb, data):
data, keyb = keyboard
- replied_message = message.reply_to_message
- if replied_message:
+ if replied_message := message.reply_to_message:
if replied_message.from_user.id != message.from_user.id:
message = replied_message
if type_ == "text":
@@ -235,15 +231,14 @@ async def delete_all(_, message):
_notes = await get_note_names(message.chat.id)
if not _notes:
return await message.reply_text("**No notes in this chat.**")
- else:
- keyboard = InlineKeyboardMarkup(
- [
- [InlineKeyboardButton("YES, DO IT", callback_data="delete_yes"),
- InlineKeyboardButton("Cancel", callback_data="delete_no")
- ]
+ keyboard = InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton("YES, DO IT", callback_data="delete_yes"),
+ InlineKeyboardButton("Cancel", callback_data="delete_no")
]
- )
- await message.reply_text("**Are you sure you want to delete all the notes in this chat forever ?.**", reply_markup=keyboard)
+ ]
+ )
+ await message.reply_text("**Are you sure you want to delete all the notes in this chat forever ?.**", reply_markup=keyboard)
@app.on_callback_query(filters.regex("delete_(.*)"))
diff --git a/misskaty/plugins/quotly.py b/misskaty/plugins/quotly.py
index a672f790..ffdbe896 100644
--- a/misskaty/plugins/quotly.py
+++ b/misskaty/plugins/quotly.py
@@ -251,9 +251,7 @@ def isArgInt(txt) -> list:
@app.on_message(filters.command(["q", "qr"]) & filters.reply)
async def msg_quotly_cmd(self: Client, ctx: Message):
- is_reply = False
- if ctx.command[0].endswith("r"):
- is_reply = True
+ is_reply = bool(ctx.command[0].endswith("r"))
if len(ctx.text.split()) > 1:
check_arg = isArgInt(ctx.command[1])
if check_arg[0]:
diff --git a/misskaty/plugins/web_scraper.py b/misskaty/plugins/web_scraper.py
index 37f76a71..43ac137c 100644
--- a/misskaty/plugins/web_scraper.py
+++ b/misskaty/plugins/web_scraper.py
@@ -1286,8 +1286,7 @@ async def kusonime_scrap(client, callback_query, strings):
InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"),
)
try:
- init_url = data_kuso.get(link, False)
- if init_url:
+ if init_url := data_kuso.get(link, False):
await callback_query.message.edit_msg(init_url.get("ph_url"), reply_markup=keyboard)
tgh = await kuso.telegraph(link, client.me.username)
data_kuso[link] = {"ph_url": tgh}
@@ -1369,9 +1368,7 @@ async def nodrakorddl_scrap(_, callback_query, strings):
soup = BeautifulSoup(html.text, "lxml")
if "/tv/" in link:
result = soup.find("div", {"entry-content entry-content-single"}).find_all("p")
- msg = ""
- for i in result:
- msg += str(f"{i}\n")
+ msg = "".join(str(f"{i}\n") for i in result)
link = await post_to_telegraph(False, "MissKaty NoDrakor", msg)
return await callback_query.message.edit_msg(
strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard
diff --git a/misskaty/vars.py b/misskaty/vars.py
index a8180b09..f66b8116 100644
--- a/misskaty/vars.py
+++ b/misskaty/vars.py
@@ -12,13 +12,11 @@ LOGGER = getLogger("MissKaty")
dotenv.load_dotenv("config.env", override=True)
-# Required ENV
-API_ID = environ.get("API_ID", "")
-if not API_ID:
+if API_ID := environ.get("API_ID", ""):
+ API_ID = int(API_ID)
+else:
LOGGER.error("API_ID variable is missing! Exiting now")
sys.exit(1)
-else:
- API_ID = int(API_ID)
API_HASH = environ.get("API_HASH", "")
if not API_HASH:
LOGGER.error("API_HASH variable is missing! Exiting now")
@@ -31,13 +29,12 @@ DATABASE_URI = environ.get("DATABASE_URI", "")
if not DATABASE_URI:
LOGGER.error("DATABASE_URI variable is missing! Exiting now")
sys.exit(1)
-LOG_CHANNEL = environ.get("LOG_CHANNEL", "")
-if not LOG_CHANNEL:
- LOGGER.error("LOG_CHANNEL variable is missing! Exiting now")
- sys.exit(1)
-else:
+if LOG_CHANNEL := environ.get("LOG_CHANNEL", ""):
LOG_CHANNEL = int(LOG_CHANNEL)
+else:
+ LOGGER.error("LOG_CHANNEL variable is missing! Exiting now")
+ sys.exit(1)
# Optional ENV
LOG_GROUP_ID = environ.get("LOG_GROUP_ID")
USER_SESSION = environ.get("USER_SESSION")
diff --git a/update.py b/update.py
index 99b9c88c..0145e797 100644
--- a/update.py
+++ b/update.py
@@ -61,7 +61,6 @@ if all([UPSTREAM_REPO_URL, UPSTREAM_REPO_BRANCH]):
LOGGER.info(f"Successfully update with latest branch > {UPSTREAM_REPO_BRANCH}")
except Exception as e:
LOGGER.error(e)
- pass
else:
LOGGER.warning(
"UPSTREAM_REPO_URL or UPSTREAM_REPO_BRANCH is not defined, Skipping auto update"