'Refactored by Sourcery'

Co-authored-by: Sourcery AI <>
This commit is contained in:
Yasir Aris M 2024-01-15 07:33:05 +07:00 committed by GitHub
parent 2cb0518835
commit 983db69864
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 178 additions and 275 deletions

View file

@ -7,9 +7,7 @@ blacklist_filtersdb = dbname["blacklistFilters"]
async def get_blacklisted_words(chat_id: int) -> List[str]:
_filters = await blacklist_filtersdb.find_one({"chat_id": chat_id})
if not _filters:
return []
return _filters["filters"]
return [] if not _filters else _filters["filters"]
async def save_blacklist_filter(chat_id: int, word: str):

View file

@ -9,9 +9,7 @@ fedsdb = dbname["federation"]
def get_fed_info(fed_id):
get = fedsdb.find_one({"fed_id": str(fed_id)})
if get is None:
return False
return get
return False if get is None else get
async def get_fed_id(chat_id):
@ -19,12 +17,14 @@ async def get_fed_id(chat_id):
if get is None:
return False
else:
for chat_info in get.get("chat_ids", []):
if chat_info["chat_id"] == int(chat_id):
return get["fed_id"]
return False
return next(
(
get["fed_id"]
for chat_info in get.get("chat_ids", [])
if chat_info["chat_id"] == int(chat_id)
),
False,
)
async def get_feds_by_owner(owner_id):
@ -32,10 +32,9 @@ async def get_feds_by_owner(owner_id):
feds = await cursor.to_list(length=None)
if not feds:
return False
federations = [
return [
{"fed_id": fed["fed_id"], "fed_name": fed["fed_name"]} for fed in feds
]
return federations
async def transfer_owner(fed_id, current_owner_id, new_owner_id):
@ -58,10 +57,7 @@ async def set_log_chat(fed_id, log_group_id: int):
async def get_fed_name(chat_id):
get = await fedsdb.find_one(int(chat_id))
if get is None:
return False
else:
return get["fed_name"]
return False if get is None else get["fed_name"]
async def is_user_fed_owner(fed_id, user_id: int):
@ -69,19 +65,13 @@ async def is_user_fed_owner(fed_id, user_id: int):
if not getfed:
return False
owner_id = getfed["owner_id"]
if user_id == owner_id or user_id not in SUDO:
return True
else:
return False
return user_id == owner_id or user_id not in SUDO
async def search_fed_by_id(fed_id):
get = await fedsdb.find_one({"fed_id": str(fed_id)})
if get is not None:
return get
else:
return False
return get if get is not None else False
def chat_join_fed(fed_id, chat_name, chat_id):
@ -96,41 +86,26 @@ async def chat_leave_fed(chat_id):
{"chat_ids.chat_id": int(chat_id)},
{"$pull": {"chat_ids": {"chat_id": int(chat_id)}}},
)
if result.modified_count > 0:
return True
else:
return False
return result.modified_count > 0
async def user_join_fed(fed_id, user_id):
result = await fedsdb.update_one(
{"fed_id": fed_id}, {"$addToSet": {"fadmins": int(user_id)}}, upsert=True
)
if result.modified_count > 0:
return True
else:
return False
return result.modified_count > 0
async def user_demote_fed(fed_id, user_id):
result = await fedsdb.update_one(
{"fed_id": fed_id}, {"$pull": {"fadmins": int(user_id)}}
)
if result.modified_count > 0:
return True
else:
return False
return result.modified_count > 0
async def search_user_in_fed(fed_id, user_id):
getfed = await search_fed_by_id(fed_id)
if getfed is None:
return False
fadmins = getfed["fadmins"]
if user_id in fadmins:
return True
else:
return False
return False if getfed is None else user_id in getfed["fadmins"]
async def chat_id_and_names_in_fed(fed_id):

View file

@ -51,9 +51,7 @@ class UsersData:
async def get_ban_status(self, id):
user = await self.col.find_one({"_id": int(id)})
if user:
return True, user
return False, None
return (True, user) if user else (False, None)
async def get_all_users(self):
return self.col.find({})

View file

@ -24,9 +24,7 @@ LOGGER = getLogger("MissKaty")
@property
def parse_cmd(msg):
if len(msg.command) > 1:
return msg.text.split(None, 1)[1]
return None
return msg.text.split(None, 1)[1] if len(msg.command) > 1 else None
async def reply_text(

View file

@ -24,7 +24,11 @@ def extract_urls(reply_markup):
for i, row in enumerate(buttons):
for j, button in enumerate(row):
if button.url:
name = "\n~\nbutton" if i * len(row) + j + 1 == 1 else f"button{i * len(row) + j + 1}"
name = (
"\n~\nbutton"
if i * len(row) + j == 0
else f"button{i * len(row) + j + 1}"
)
urls.append((f"{name}", button.text, button.url))
return urls

View file

@ -18,7 +18,7 @@ async def kusonimeBypass(url: str):
result = {}
page = await fetch.get(url)
if page.status_code != 200:
raise Exception(f"ERROR: Hostname might be blocked by server!")
raise Exception("ERROR: Hostname might be blocked by server!")
try:
soup = BeautifulSoup(page.text, "lxml")
thumb = soup.find("div", {"class": "post-thumb"}).find("img").get("src")
@ -96,7 +96,7 @@ async def kusonimeBypass(url: str):
num += 1
result.update({"title": title, "thumb": thumb, "genre": genre, "genre_string": ", ".join(genre), "status_anime": status_anime, "season": season, "tipe": tipe, "ep": ep, "score": score, "duration": duration, "rilis": rilis, "data": data})
except Exception as e:
if len(result) != 0:
if result:
result.clear()
err = traceback.format_exc()
LOGGER.error(f"class: {e.__class__.__name_}, {err}")
@ -133,10 +133,11 @@ async def byPassPh(url: str, name: str) -> Optional[str]:
<br>
{{/data}}
""".strip()
plink = await post_to_telegraph(
False, f"{kusonime.get('title')} By {escape(name)}", chevron.render(template, kusonime)
return await post_to_telegraph(
False,
f"{kusonime.get('title')} By {escape(name)}",
chevron.render(template, kusonime),
)
return plink
class Kusonime:

View file

@ -76,11 +76,12 @@ def paginate_modules(page_n, module_dict, prefix, chat=None):
)
]
else:
pairs = pairs[modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1)] + [
pairs = pairs[
modulo_page * COLUMN_SIZE : COLUMN_SIZE * (modulo_page + 1)
] + [
(
EqInlineKeyboardButton(
"Back",
callback_data="{}_home({})".format(prefix, modulo_page),
"Back", callback_data=f"{prefix}_home({modulo_page})"
),
)
]

View file

@ -173,9 +173,7 @@ class Cache:
@staticmethod
def _exp_datetime(exp: float) -> Optional[datetime]:
if exp == -1.0:
return None
return datetime.utcfromtimestamp(exp)
return None if exp == -1.0 else datetime.utcfromtimestamp(exp)
def _stream(self, value: Any) -> bytes:
return pickle.dumps(value, protocol=self.PICKLE_PROTOCOL)
@ -407,10 +405,7 @@ class Cache:
def get_all(self) -> Dict[str, Any]:
"""Get all key-value pairs from the cache."""
all_data = self._con.execute("SELECT key, value FROM cache;").fetchall()
results = {}
for key, value in all_data:
results[key] = self._unstream(value)
return results
return {key: self._unstream(value) for key, value in all_data}
def clear(self) -> None:
"""Clear the cache from all values."""

View file

@ -167,12 +167,4 @@ def isValidURL(str):
# If the string is empty
# return false
if (str == None):
return False
# Return if the string
# matched the ReGex
if(re.search(p, str)):
return True
else:
return False
return False if str is None else bool((re.search(p, str)))

View file

@ -863,7 +863,6 @@ async def set_chat_photo(_, ctx: Message):
@app.on_message(filters.group & filters.command('mentionall', COMMAND_HANDLER))
async def mentionall(app: Client, msg: Message):
NUM = 4
user = await msg.chat.get_member(msg.from_user.id)
if user.status in (enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR):
total = []
@ -874,6 +873,7 @@ async def mentionall(app: Client, msg: Message):
else:
total.append(member.user.mention())
NUM = 4
for i in range(0, len(total), NUM):
message = ' '.join(total[i:i+NUM])
await app.send_message(msg.chat.id, message, message_thread_id=msg.message_thread_id)

View file

@ -126,10 +126,8 @@ async def log_file(_, ctx: Message, strings):
@app.on_message(filters.command(["donate"], COMMAND_HANDLER))
async def donate(self: Client, ctx: Message):
try:
with contextlib.suppress(ReactionInvalid):
await ctx.react(emoji="❤️")
except ReactionInvalid:
pass
try:
await ctx.reply_photo(
"https://img.yasirweb.eu.org/file/9427d61d6968b8ee4fb2f.jpg",

View file

@ -242,7 +242,12 @@ async def twitterdl(_, message):
"ts": "1691416179",
"source": "form",
}
post = await fetch.post(f"https://ssstwitter.com/id", data=data, headers=headers, follow_redirects=True)
post = await fetch.post(
"https://ssstwitter.com/id",
data=data,
headers=headers,
follow_redirects=True,
)
if post.status_code not in [200, 401]:
return await msg.edit_msg("Unknown error.")
soup = BeautifulSoup(post.text, "lxml")
@ -275,7 +280,9 @@ async def tiktokdl(_, message):
msg = await message.reply("Trying download...")
try:
r = (
await fetch.post(f"https://lovetik.com/api/ajax/search", data={"query": link})
await fetch.post(
"https://lovetik.com/api/ajax/search", data={"query": link}
)
).json()
fname = (await fetch.head(r["links"][0]["a"])).headers.get("content-disposition", "")
filename = unquote(fname.split('filename=')[1].strip('"').split('"')[0])

View file

@ -61,11 +61,11 @@ async def new_fed(self, message):
if len(message.command) < 2:
return await message.reply_msg("Please write the name of the federation!")
fednam = message.text.split(None, 1)[1]
if not fednam == "":
if fednam != "":
fed_id = str(uuid.uuid4())
fed_name = fednam
x = await fedsdb.update_one(
{"fed_id": str(fed_id)},
{"fed_id": fed_id},
{
"$set": {
"fed_name": str(fed_name),
@ -85,17 +85,13 @@ async def new_fed(self, message):
)
await message.reply_msg(
"**You have succeeded in creating a new federation!**"
"\nName: `{}`"
"\nID: `{}`"
"\n\nUse the command below to join the federation:"
"\n`/joinfed {}`".format(fed_name, fed_id, fed_id),
f"**You have succeeded in creating a new federation!**\nName: `{fed_name}`\nID: `{fed_id}`\n\nUse the command below to join the federation:\n`/joinfed {fed_id}`",
parse_mode=ParseMode.MARKDOWN,
)
try:
await app.send_message(
LOG_GROUP_ID,
"New Federation: <b>{}</b>\nID: <pre>{}</pre>".format(fed_name, fed_id),
f"New Federation: <b>{fed_name}</b>\nID: <pre>{fed_id}</pre>",
parse_mode=ParseMode.HTML,
)
except:
@ -114,7 +110,8 @@ async def del_fed(client, message):
"Federations can only be deleted by privately messaging me."
)
args = message.text.split(" ", 1)
if len(args) > 1:
if len(args) <= 1:
return await message.reply_text("What should I delete?")
is_fed_id = args[1].strip()
getinfo = await get_fed_info(is_fed_id)
if getinfo is False:
@ -123,15 +120,11 @@ async def del_fed(client, message):
fed_id = is_fed_id
else:
return await message.reply_text("Only federation owners can do this!")
else:
return await message.reply_text("What should I delete?")
is_owner = await is_user_fed_owner(fed_id, user.id)
if is_owner is False:
return await message.reply_text("Only federation owners can do this!")
await message.reply_text(
"You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{}' will be permanently lost.".format(
getinfo["fed_name"]
),
f"""You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{getinfo["fed_name"]}' will be permanently lost.""",
reply_markup=InlineKeyboardMarkup(
[
[
@ -245,10 +238,10 @@ async def fed_log(client, message):
"Send this command on the chat which you need to set as fed log channel."
)
member = await app.get_chat_member(chat.id, user.id)
if (
member.status == ChatMemberStatus.OWNER
or member.status == ChatMemberStatus.ADMINISTRATOR
):
if member.status in [
ChatMemberStatus.OWNER,
ChatMemberStatus.ADMINISTRATOR,
]:
if len(message.command) < 2:
return await message.reply_text(
"Please provide the Id of the federation with the command!"
@ -258,10 +251,7 @@ async def fed_log(client, message):
if info is False:
return await message.reply_text("This federation does not exist.")
if await is_user_fed_owner(fed_id, user.id):
if "/unsetfedlog" in message.text:
log_group_id = LOG_GROUP_ID
else:
log_group_id = chat.id
log_group_id = LOG_GROUP_ID if "/unsetfedlog" in message.text else chat.id
loged = await set_log_chat(fed_id, log_group_id)
if "/unsetfedlog" in message.text:
return await message.reply_text("log channel removed successfully.")
@ -281,21 +271,17 @@ async def fed_chat(self, message):
fed_id = await get_fed_id(chat.id)
member = await self.get_chat_member(chat.id, user.id)
if (
member.status == ChatMemberStatus.OWNER
or member.status == ChatMemberStatus.ADMINISTRATOR
):
pass
else:
if member.status not in [
ChatMemberStatus.OWNER,
ChatMemberStatus.ADMINISTRATOR,
]:
return await message.reply_text("You must be an admin to execute this command")
if not fed_id:
return await message.reply_text("This group is not in any federation!")
info = await get_fed_info(fed_id)
text = "This group is part of the following federation:"
text += "\n{} (ID: <code>{}</code>)".format(info["fed_name"], fed_id)
text = f'This group is part of the following federation:\n{info["fed_name"]} (ID: <code>{fed_id}</code>)'
await message.reply_text(text, parse_mode=ParseMode.HTML)
@ -312,12 +298,9 @@ async def join_fed(self, message):
member = await self.get_chat_member(chat.id, user.id)
fed_id = await get_fed_id(int(chat.id))
if user.id in SUDO:
pass
else:
if member.status == ChatMemberStatus.OWNER:
pass
else:
if (
user.id in SUDO or member.status != ChatMemberStatus.OWNER
) and user.id not in SUDO:
return await message.reply_text("Only group creators can use this command!")
if fed_id:
return await message.reply_text("You cannot join two federations from one chat")
@ -334,18 +317,15 @@ async def join_fed(self, message):
f"Failed to join federation! Please contact {SUPPORT_CHAT} if this problem persists!"
)
get_fedlog = getfed["log_group_id"]
if get_fedlog:
if get_fedlog := getfed["log_group_id"]:
await app.send_message(
get_fedlog,
"Chat **{}** has joined the federation **{}**".format(
chat.title, getfed["fed_name"]
),
f'Chat **{chat.title}** has joined the federation **{getfed["fed_name"]}**',
parse_mode=ParseMode.MARKDOWN,
)
await message.reply_text(
"This group has joined the federation: {}!".format(getfed["fed_name"])
f'This group has joined the federation: {getfed["fed_name"]}!'
)
else:
await message.reply_text(
@ -370,17 +350,14 @@ async def leave_fed(client, message):
member = await app.get_chat_member(chat.id, user.id)
if member.status == ChatMemberStatus.OWNER or user.id in SUDO:
if await chat_leave_fed(int(chat.id)) is True:
get_fedlog = fed_info["log_group_id"]
if get_fedlog:
if get_fedlog := fed_info["log_group_id"]:
await app.send_message(
get_fedlog,
"Chat **{}** has left the federation **{}**".format(
chat.title, fed_info["fed_name"]
),
f'Chat **{chat.title}** has left the federation **{fed_info["fed_name"]}**',
parse_mode=ParseMode.MARKDOWN,
)
await message.reply_text(
"This group has left the federation {}!".format(fed_info["fed_name"]),
f'This group has left the federation {fed_info["fed_name"]}!'
)
else:
await message.reply_text(
@ -412,9 +389,7 @@ async def fed_chat(client, message):
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
if user.id in all_admins or user.id in SUDO:
pass
else:
if user.id not in all_admins and user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@ -602,12 +577,10 @@ async def fban_user(client, message):
if not fed_id:
return await message.reply_text("**This chat is not a part of any federation.")
info = await get_fed_info(fed_id)
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
fed_owner = info["owner_id"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
if from_user.id in all_admins or from_user.id in SUDO:
pass
else:
if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@ -660,8 +633,7 @@ async def fban_user(client, message):
try:
await app.send_message(
user.id,
f"Hello, You have been fed banned by {from_user.mention},"
+ " You can appeal for this ban by talking to him.",
f"Hello, You have been fed banned by {from_user.mention}, You can appeal for this ban by talking to him.",
)
except Exception:
pass
@ -702,12 +674,10 @@ async def funban_user(client, message):
if not fed_id:
return await message.reply_text("**This chat is not a part of any federation.")
info = await get_fed_info(fed_id)
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
fed_owner = info["owner_id"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
if from_user.id in all_admins or from_user.id in SUDO:
pass
else:
if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@ -755,8 +725,7 @@ async def funban_user(client, message):
try:
await app.send_message(
user.id,
f"Hello, You have been fed unbanned by {from_user.mention},"
+ " You can thank him for his action.",
f"Hello, You have been fed unbanned by {from_user.mention}, You can thank him for his action.",
)
except Exception:
pass
@ -838,9 +807,7 @@ async def fbroadcast_message(client, message):
fed_owner = info["owner_id"]
fed_admins = info["fadmins"]
all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
if from_user.id in all_admins or from_user.id in SUDO:
pass
else:
if from_user.id not in all_admins and from_user.id not in SUDO:
return await message.reply_text(
"You need to be a Fed Admin to use this command"
)
@ -891,12 +858,9 @@ async def del_fed_button(client, cb):
getfed = await get_fed_info(fed_id)
if getfed:
delete = fedsdb.delete_one({"fed_id": str(fed_id)})
if delete:
if delete := fedsdb.delete_one({"fed_id": str(fed_id)}):
await cb.message.edit_text(
"You have removed your Federation! Now all the Groups that are connected with `{}` do not have a Federation.".format(
getfed["fed_name"]
),
f'You have removed your Federation! Now all the Groups that are connected with `{getfed["fed_name"]}` do not have a Federation.',
parse_mode=ParseMode.MARKDOWN,
)

View file

@ -75,8 +75,7 @@ async def save_filters(_, message):
data = text[1].strip()
if replied_message.sticker or replied_message.video_note:
data = None
else:
if replied_message.sticker or replied_message.video_note:
elif replied_message.sticker or replied_message.video_note:
data = None
elif not replied_message.text and not replied_message.caption:
data = None
@ -109,9 +108,8 @@ async def save_filters(_, message):
if replied_message.voice:
_type = "voice"
file_id = replied_message.voice.file_id
if replied_message.reply_markup and not "~" in data:
urls = extract_urls(replied_message.reply_markup)
if urls:
if replied_message.reply_markup and "~" not in data:
if urls := extract_urls(replied_message.reply_markup):
response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls])
data = data + response
name = name.replace("_", " ")
@ -175,11 +173,9 @@ async def filters_re(_, message):
keyb = None
if data:
if re.findall(r"\[.+\,.+\]", data):
keyboard = extract_text_and_keyb(ikb, data)
if keyboard:
if keyboard := extract_text_and_keyb(ikb, data):
data, keyb = keyboard
replied_message = message.reply_to_message
if replied_message:
if replied_message := message.reply_to_message:
if text.startswith("~"):
await message.delete()
if replied_message.from_user.id != message.from_user.id:

View file

@ -204,7 +204,7 @@ async def givereact(c, m):
return await m.reply("Please add reaction after command, you can give multiple reaction too.")
if not m.reply_to_message:
return await m.reply("Please reply to the message you want to react to.")
emot = [emoji for emoji in regex.findall(r'\p{Emoji}', m.text)]
emot = list(regex.findall(r'\p{Emoji}', m.text))
try:
await m.reply_to_message.react(emoji=emot)
except ReactionInvalid:

View file

@ -93,12 +93,10 @@ async def imdblangset(_, query: CallbackQuery):
InlineButton("🗑 Remove UserSetting", f"setimdb#rm#{query.from_user.id}")
)
buttons.row(InlineButton("❌ Close", f"close#{query.from_user.id}"))
try:
with contextlib.suppress(MessageIdInvalid, MessageNotModified):
await query.message.edit_caption(
"<i>Please select available language below..</i>", reply_markup=buttons
)
except (MessageIdInvalid, MessageNotModified):
pass
@app.on_cb("setimdb")
@ -109,7 +107,7 @@ async def imdbsetlang(_, query: CallbackQuery):
_, langset = await is_imdbset(query.from_user.id)
if langset == lang:
return await query.answer(f"⚠️ Your Setting Already in ({langset})!", True)
try:
with contextlib.suppress(MessageIdInvalid, MessageNotModified):
if lang == "eng":
await add_imdbset(query.from_user.id, lang)
await query.message.edit_caption(
@ -125,8 +123,6 @@ async def imdbsetlang(_, query: CallbackQuery):
await query.message.edit_caption(
"UserSetting for IMDB has been deleted from database."
)
except (MessageIdInvalid, MessageNotModified):
pass
async def imdb_search_id(kueri, message):
@ -265,12 +261,10 @@ async def imdbcari(_, query: CallbackQuery):
del LIST_CARI[msg]
except KeyError:
return await query.message.edit_caption("⚠️ Callback Query Sudah Expired!")
try:
with contextlib.suppress(MessageIdInvalid, MessageNotModified):
await query.message.edit_caption(
"<i>🔎 Sedang mencari di Database IMDB..</i>"
)
except (MessageIdInvalid, MessageNotModified):
pass
msg = ""
buttons = InlineKeyboard(row_width=4)
with contextlib.redirect_stdout(sys.stderr):

View file

@ -84,25 +84,25 @@ def calcExpression(text):
def calc_btn(uid):
CALCULATE_BUTTONS = InlineKeyboardMarkup(
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton("DEL", callback_data=f"calc|{uid}|DEL"),
InlineKeyboardButton("AC", callback_data=f"calc|{uid}|AC"),
InlineKeyboardButton("(", callback_data=f"calc|{uid}|("),
InlineKeyboardButton(")", callback_data=f"calc|{uid}|)")
InlineKeyboardButton(")", callback_data=f"calc|{uid}|)"),
],
[
InlineKeyboardButton("7", callback_data=f"calc|{uid}|7"),
InlineKeyboardButton("8", callback_data=f"calc|{uid}|8"),
InlineKeyboardButton("9", callback_data=f"calc|{uid}|9"),
InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/")
InlineKeyboardButton("÷", callback_data=f"calc|{uid}|/"),
],
[
InlineKeyboardButton("4", callback_data=f"calc|{uid}|4"),
InlineKeyboardButton("5", callback_data=f"calc|{uid}|5"),
InlineKeyboardButton("6", callback_data=f"calc|{uid}|6"),
InlineKeyboardButton("×", callback_data=f"calc|{uid}|*")
InlineKeyboardButton("×", callback_data=f"calc|{uid}|*"),
],
[
InlineKeyboardButton("1", callback_data=f"calc|{uid}|1"),
@ -115,10 +115,9 @@ def calc_btn(uid):
InlineKeyboardButton("0", callback_data=f"calc|{uid}|0"),
InlineKeyboardButton("=", callback_data=f"calc|{uid}|="),
InlineKeyboardButton("+", callback_data=f"calc|{uid}|+"),
]
],
]
)
return CALCULATE_BUTTONS
@app.on_message(filters.command(["calc", "calculate", "calculator"]))
@ -153,8 +152,7 @@ async def calc_cb(self, query):
dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt)
opcheck = re.findall(r"([*/\+-]{2,})", inpt)
if not dot_dot_check and not opcheck:
strOperands = re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt)
if strOperands:
if strOperands := re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt):
text += data
result = calcExpression(text)
@ -494,14 +492,12 @@ async def who_is(client, message):
message_out_str += f"<b>✅ Verified:</b> {from_user.is_verified}\n"
message_out_str += f"<b>🌐 Profile Link:</b> <a href='tg://user?id={from_user.id}'><b>Click Here</b></a>\n"
if message.chat.type.value in (("supergroup", "channel")):
try:
with contextlib.suppress(UserNotParticipant, ChatAdminRequired):
chat_member_p = await message.chat.get_member(from_user.id)
joined_date = chat_member_p.joined_date
message_out_str += (
"<b>➲Joined this Chat on:</b> <code>" f"{joined_date}" "</code>\n"
)
except (UserNotParticipant, ChatAdminRequired):
pass
if chat_photo := from_user.photo:
local_user_photo = await client.download_media(message=chat_photo.big_file_id)
buttons = [

View file

@ -69,8 +69,7 @@ async def save_notee(_, message):
data = text[1].strip()
if replied_message.sticker or replied_message.video_note:
data = None
else:
if replied_message.sticker or replied_message.video_note:
elif replied_message.sticker or replied_message.video_note:
data = None
elif not replied_message.text and not replied_message.caption:
data = None
@ -103,9 +102,8 @@ async def save_notee(_, message):
if replied_message.voice:
_type = "voice"
file_id = replied_message.voice.file_id
if replied_message.reply_markup and not "~" in data:
urls = extract_urls(replied_message.reply_markup)
if urls:
if replied_message.reply_markup and "~" not in data:
if urls := extract_urls(replied_message.reply_markup):
response = "\n".join([f"{name}=[{text}, {url}]" for name, text, url in urls])
data = data + response
note = {
@ -151,11 +149,9 @@ async def get_one_note(self, message):
keyb = None
if data:
if findall(r"\[.+\,.+\]", data):
keyboard = extract_text_and_keyb(ikb, data)
if keyboard:
if keyboard := extract_text_and_keyb(ikb, data):
data, keyb = keyboard
replied_message = message.reply_to_message
if replied_message:
if replied_message := message.reply_to_message:
if replied_message.from_user.id != message.from_user.id:
message = replied_message
if type_ == "text":
@ -235,7 +231,6 @@ async def delete_all(_, message):
_notes = await get_note_names(message.chat.id)
if not _notes:
return await message.reply_text("**No notes in this chat.**")
else:
keyboard = InlineKeyboardMarkup(
[
[InlineKeyboardButton("YES, DO IT", callback_data="delete_yes"),

View file

@ -251,9 +251,7 @@ def isArgInt(txt) -> list:
@app.on_message(filters.command(["q", "qr"]) & filters.reply)
async def msg_quotly_cmd(self: Client, ctx: Message):
is_reply = False
if ctx.command[0].endswith("r"):
is_reply = True
is_reply = bool(ctx.command[0].endswith("r"))
if len(ctx.text.split()) > 1:
check_arg = isArgInt(ctx.command[1])
if check_arg[0]:

View file

@ -1286,8 +1286,7 @@ async def kusonime_scrap(client, callback_query, strings):
InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"),
)
try:
init_url = data_kuso.get(link, False)
if init_url:
if init_url := data_kuso.get(link, False):
await callback_query.message.edit_msg(init_url.get("ph_url"), reply_markup=keyboard)
tgh = await kuso.telegraph(link, client.me.username)
data_kuso[link] = {"ph_url": tgh}
@ -1369,9 +1368,7 @@ async def nodrakorddl_scrap(_, callback_query, strings):
soup = BeautifulSoup(html.text, "lxml")
if "/tv/" in link:
result = soup.find("div", {"entry-content entry-content-single"}).find_all("p")
msg = ""
for i in result:
msg += str(f"{i}\n")
msg = "".join(str(f"{i}\n") for i in result)
link = await post_to_telegraph(False, "MissKaty NoDrakor", msg)
return await callback_query.message.edit_msg(
strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard

View file

@ -12,13 +12,11 @@ LOGGER = getLogger("MissKaty")
dotenv.load_dotenv("config.env", override=True)
# Required ENV
API_ID = environ.get("API_ID", "")
if not API_ID:
if API_ID := environ.get("API_ID", ""):
API_ID = int(API_ID)
else:
LOGGER.error("API_ID variable is missing! Exiting now")
sys.exit(1)
else:
API_ID = int(API_ID)
API_HASH = environ.get("API_HASH", "")
if not API_HASH:
LOGGER.error("API_HASH variable is missing! Exiting now")
@ -31,13 +29,12 @@ DATABASE_URI = environ.get("DATABASE_URI", "")
if not DATABASE_URI:
LOGGER.error("DATABASE_URI variable is missing! Exiting now")
sys.exit(1)
LOG_CHANNEL = environ.get("LOG_CHANNEL", "")
if not LOG_CHANNEL:
LOGGER.error("LOG_CHANNEL variable is missing! Exiting now")
sys.exit(1)
else:
if LOG_CHANNEL := environ.get("LOG_CHANNEL", ""):
LOG_CHANNEL = int(LOG_CHANNEL)
else:
LOGGER.error("LOG_CHANNEL variable is missing! Exiting now")
sys.exit(1)
# Optional ENV
LOG_GROUP_ID = environ.get("LOG_GROUP_ID")
USER_SESSION = environ.get("USER_SESSION")

View file

@ -61,7 +61,6 @@ if all([UPSTREAM_REPO_URL, UPSTREAM_REPO_BRANCH]):
LOGGER.info(f"Successfully update with latest branch > {UPSTREAM_REPO_BRANCH}")
except Exception as e:
LOGGER.error(e)
pass
else:
LOGGER.warning(
"UPSTREAM_REPO_URL or UPSTREAM_REPO_BRANCH is not defined, Skipping auto update"