mirror of
https://github.com/yasirarism/MissKatyPyro.git
synced 2025-12-29 17:44:50 +00:00
216 lines
6.6 KiB
Python
216 lines
6.6 KiB
Python
import re
|
|
|
|
from pyrogram import filters
|
|
|
|
from database.karma_db import (
|
|
get_karma,
|
|
get_karmas,
|
|
is_karma_on,
|
|
karma_off,
|
|
karma_on,
|
|
update_karma,
|
|
)
|
|
from misskaty import app
|
|
from misskaty.core.decorator.errors import capture_err
|
|
from misskaty.core.decorator.permissions import adminsOnly
|
|
from misskaty.helper.functions import alpha_to_int, int_to_alpha
|
|
|
|
__MODULE__ = "Karma"
|
|
__HELP__ = """
|
|
Give reputation to other people in group.
|
|
|
|
/karma_toggle [enable/disable] - Enable/Disable Karma.
|
|
/karma - View all karma from member group.
|
|
"""
|
|
|
|
karma_positive_group = 3
|
|
karma_negative_group = 4
|
|
|
|
regex_upvote = r"^(\++|\+1|thx||tnx|ty|tq|thank you|thanx|thanks|pro|cool|good|agree|makasih|👍|\+\+ .+)$"
|
|
regex_downvote = r"^(-+|-1|not cool|disagree|worst|bad|👎|-- .+)$"
|
|
|
|
n = "\n"
|
|
w = " "
|
|
|
|
bold = lambda x: f"**{x}:** "
|
|
bold_ul = lambda x: f"**--{x}:**-- "
|
|
mono = lambda x: f"`{x}`{n}"
|
|
|
|
|
|
def section(
|
|
title: str,
|
|
body: dict,
|
|
indent: int = 2,
|
|
underline: bool = False,
|
|
) -> str:
|
|
text = (bold_ul(title) + n) if underline else bold(title) + n
|
|
|
|
for key, value in body.items():
|
|
text += (
|
|
indent * w
|
|
+ bold(key)
|
|
+ ((value[0] + n) if isinstance(value, list) else mono(value))
|
|
)
|
|
return text
|
|
|
|
|
|
async def get_user_id_and_usernames(client) -> dict:
|
|
with client.storage.lock, client.storage.conn:
|
|
users = client.storage.conn.execute(
|
|
'SELECT * FROM peers WHERE type in ("user", "bot") AND username NOT null'
|
|
).fetchall()
|
|
return {user[0]: user[3] for user in users}
|
|
|
|
|
|
@app.on_message(
|
|
filters.text
|
|
& filters.group
|
|
& filters.incoming
|
|
& filters.reply
|
|
& filters.regex(regex_upvote, re.IGNORECASE)
|
|
& ~filters.via_bot
|
|
& ~filters.bot,
|
|
group=karma_positive_group,
|
|
)
|
|
@capture_err
|
|
async def upvote(_, message):
|
|
if not await is_karma_on(message.chat.id):
|
|
return
|
|
if not message.reply_to_message.from_user:
|
|
return
|
|
if not message.from_user:
|
|
return
|
|
if message.reply_to_message.from_user.is_bot:
|
|
return
|
|
if message.reply_to_message.from_user.id == message.from_user.id:
|
|
return
|
|
chat_id = message.chat.id
|
|
user_id = message.reply_to_message.from_user.id
|
|
user_mention = message.reply_to_message.from_user.mention
|
|
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
|
|
if current_karma:
|
|
current_karma = current_karma["karma"]
|
|
karma = current_karma + 1
|
|
else:
|
|
karma = 1
|
|
new_karma = {"karma": karma}
|
|
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
|
|
await message.reply_msg(
|
|
f"Incremented Karma of {user_mention} By 1 \nTotal Points: {karma}"
|
|
)
|
|
|
|
|
|
@app.on_message(
|
|
filters.text
|
|
& filters.group
|
|
& filters.incoming
|
|
& filters.reply
|
|
& filters.regex(regex_downvote, re.IGNORECASE)
|
|
& ~filters.via_bot
|
|
& ~filters.bot,
|
|
group=karma_negative_group,
|
|
)
|
|
@capture_err
|
|
async def downvote(_, message):
|
|
if not await is_karma_on(message.chat.id):
|
|
return
|
|
if not message.reply_to_message.from_user:
|
|
return
|
|
if not message.from_user:
|
|
return
|
|
if message.reply_to_message.from_user.is_bot:
|
|
return
|
|
if message.reply_to_message.from_user.id == message.from_user.id:
|
|
return
|
|
|
|
chat_id = message.chat.id
|
|
user_id = message.from_user.id
|
|
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
|
|
if current_karma:
|
|
current_karma = current_karma["karma"]
|
|
karma = current_karma - 1
|
|
else:
|
|
karma = 1
|
|
new_karma = {"karma": karma}
|
|
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
|
|
user_id = message.reply_to_message.from_user.id
|
|
user_mention = message.reply_to_message.from_user.mention
|
|
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
|
|
if current_karma:
|
|
current_karma = current_karma["karma"]
|
|
karma = current_karma - 1
|
|
else:
|
|
karma = 1
|
|
new_karma = {"karma": karma}
|
|
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
|
|
await message.reply_msg(
|
|
f"Decremented Karma of {user_mention} By 1 \nTotal Points: {karma}"
|
|
)
|
|
|
|
|
|
@app.on_message(filters.command("karma") & filters.group)
|
|
@capture_err
|
|
async def command_karma(_, message):
|
|
chat_id = message.chat.id
|
|
if not message.reply_to_message:
|
|
m = await message.reply_msg("Analyzing Karma...")
|
|
karma = await get_karmas(chat_id)
|
|
if not karma:
|
|
return await m.edit("No karma in DB for this chat.")
|
|
msg = f"Karma list of {message.chat.title}"
|
|
limit = 0
|
|
karma_dicc = {}
|
|
for i in karma:
|
|
user_id = await alpha_to_int(i)
|
|
user_karma = karma[i]["karma"]
|
|
karma_dicc[str(user_id)] = user_karma
|
|
karma_arranged = dict(
|
|
sorted(
|
|
karma_dicc.items(),
|
|
key=lambda item: item[1],
|
|
reverse=True,
|
|
)
|
|
)
|
|
if not karma_dicc:
|
|
return await m.edit("No karma in DB for this chat.")
|
|
try:
|
|
userdb = await get_user_id_and_usernames(app)
|
|
except (AttributeError, TypeError):
|
|
return
|
|
karma = {}
|
|
for user_idd, karma_count in karma_arranged.items():
|
|
if limit > 15:
|
|
break
|
|
if int(user_idd) not in list(userdb.keys()):
|
|
continue
|
|
username = userdb[int(user_idd)]
|
|
karma[f"@{username}"] = [f"**{str(karma_count)}**"]
|
|
limit += 1
|
|
await m.edit(section(msg, karma))
|
|
else:
|
|
if not message.reply_to_message.from_user:
|
|
return await message.reply("Anon user has no karma.")
|
|
|
|
user_id = message.reply_to_message.from_user.id
|
|
karma = await get_karma(chat_id, await int_to_alpha(user_id))
|
|
karma = karma["karma"] if karma else 0
|
|
await message.reply_text(f"**Total Points**: __{karma}__")
|
|
|
|
|
|
@app.on_message(filters.command("karma_toggle") & ~filters.private)
|
|
@adminsOnly("can_change_info")
|
|
async def captcha_state(_, message):
|
|
usage = "**Usage:**\n/karma_toggle [ENABLE|DISABLE]"
|
|
if len(message.command) != 2:
|
|
return await message.reply_text(usage)
|
|
chat_id = message.chat.id
|
|
state = message.text.split(None, 1)[1].strip()
|
|
state = state.lower()
|
|
if state == "enable":
|
|
await karma_on(chat_id)
|
|
await message.reply_text("Enabled Karma System for this chat.")
|
|
elif state == "disable":
|
|
await karma_off(chat_id)
|
|
await message.reply_text("Disabled Karma System for this chat.")
|
|
else:
|
|
await message.reply_text(usage)
|