MissKatyPyro/misskaty/plugins/karma.py
2023-08-03 11:01:15 +07:00

216 lines
6.6 KiB
Python

import re
from pyrogram import filters
from database.karma_db import (
get_karma,
get_karmas,
is_karma_on,
karma_off,
karma_on,
update_karma,
)
from misskaty import app
from misskaty.core.decorator.errors import capture_err
from misskaty.core.decorator.permissions import adminsOnly
from misskaty.helper.functions import alpha_to_int, int_to_alpha
__MODULE__ = "Karma"
__HELP__ = """
Give reputation to other people in group.
/karma_toggle [enable/disable] - Enable/Disable Karma.
/karma - View all karma from member group.
"""
karma_positive_group = 3
karma_negative_group = 4
regex_upvote = r"^(\+|\+\+|\+1|thx|tnx|ty|tq|thank you|thanx|thanks|pro|cool|good|agree|makasih|👍|\+\+ .+)$"
regex_downvote = r"^(-|--|-1|not cool|disagree|worst|bad|👎|-- .+)$"
n = "\n"
w = " "
bold = lambda x: f"**{x}:** "
bold_ul = lambda x: f"**--{x}:**-- "
mono = lambda x: f"`{x}`{n}"
def section(
title: str,
body: dict,
indent: int = 2,
underline: bool = False,
) -> str:
text = (bold_ul(title) + n) if underline else bold(title) + n
for key, value in body.items():
text += (
indent * w
+ bold(key)
+ ((value[0] + n) if isinstance(value, list) else mono(value))
)
return text
async def get_user_id_and_usernames(client) -> dict:
with client.storage.lock, client.storage.conn:
users = client.storage.conn.execute(
'SELECT * FROM peers WHERE type in ("user", "bot") AND username NOT null'
).fetchall()
return {user[0]: user[3] for user in users}
@app.on_message(
filters.text
& filters.group
& filters.incoming
& filters.reply
& filters.regex(regex_upvote, re.IGNORECASE)
& ~filters.via_bot
& ~filters.bot,
group=karma_positive_group,
)
@capture_err
async def upvote(_, message):
if not await is_karma_on(message.chat.id):
return
if not message.reply_to_message.from_user:
return
if not message.from_user:
return
if message.reply_to_message.from_user.is_bot:
return
if message.reply_to_message.from_user.id == message.from_user.id:
return
chat_id = message.chat.id
user_id = message.reply_to_message.from_user.id
user_mention = message.reply_to_message.from_user.mention
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
if current_karma:
current_karma = current_karma["karma"]
karma = current_karma + 1
else:
karma = 1
new_karma = {"karma": karma}
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
await message.reply_msg(
f"Incremented Karma of {user_mention} By 1 \nTotal Points: {karma}"
)
@app.on_message(
filters.text
& filters.group
& filters.incoming
& filters.reply
& filters.regex(regex_downvote, re.IGNORECASE)
& ~filters.via_bot
& ~filters.bot,
group=karma_negative_group,
)
@capture_err
async def downvote(_, message):
if not await is_karma_on(message.chat.id):
return
if not message.reply_to_message.from_user:
return
if not message.from_user:
return
if message.reply_to_message.from_user.is_bot:
return
if message.reply_to_message.from_user.id == message.from_user.id:
return
chat_id = message.chat.id
user_id = message.from_user.id
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
if current_karma:
current_karma = current_karma["karma"]
karma = current_karma - 1
else:
karma = 1
new_karma = {"karma": karma}
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
user_id = message.reply_to_message.from_user.id
user_mention = message.reply_to_message.from_user.mention
current_karma = await get_karma(chat_id, await int_to_alpha(user_id))
if current_karma:
current_karma = current_karma["karma"]
karma = current_karma - 1
else:
karma = 1
new_karma = {"karma": karma}
await update_karma(chat_id, await int_to_alpha(user_id), new_karma)
await message.reply_msg(
f"Decremented Karma of {user_mention} By 1 \nTotal Points: {karma}"
)
@app.on_message(filters.command("karma") & filters.group)
@capture_err
async def command_karma(_, message):
chat_id = message.chat.id
if not message.reply_to_message:
m = await message.reply_msg("Analyzing Karma...")
karma = await get_karmas(chat_id)
if not karma:
return await m.edit("No karma in DB for this chat.")
msg = f"Karma list of {message.chat.title}"
limit = 0
karma_dicc = {}
for i in karma:
user_id = await alpha_to_int(i)
user_karma = karma[i]["karma"]
karma_dicc[str(user_id)] = user_karma
karma_arranged = dict(
sorted(
karma_dicc.items(),
key=lambda item: item[1],
reverse=True,
)
)
if not karma_dicc:
return await m.edit("No karma in DB for this chat.")
try:
userdb = await get_user_id_and_usernames(app)
except (AttributeError, TypeError):
return
karma = {}
for user_idd, karma_count in karma_arranged.items():
if limit > 15:
break
if int(user_idd) not in list(userdb.keys()):
continue
username = userdb[int(user_idd)]
karma[f"@{username}"] = [f"**{str(karma_count)}**"]
limit += 1
await m.edit(section(msg, karma))
else:
if not message.reply_to_message.from_user:
return await message.reply("Anon user has no karma.")
user_id = message.reply_to_message.from_user.id
karma = await get_karma(chat_id, await int_to_alpha(user_id))
karma = karma["karma"] if karma else 0
await message.reply_text(f"**Total Points**: __{karma}__")
@app.on_message(filters.command("karma_toggle") & ~filters.private)
@adminsOnly("can_change_info")
async def captcha_state(_, message):
usage = "**Usage:**\n/karma_toggle [ENABLE|DISABLE]"
if len(message.command) != 2:
return await message.reply_text(usage)
chat_id = message.chat.id
state = message.text.split(None, 1)[1].strip()
state = state.lower()
if state == "enable":
await karma_on(chat_id)
await message.reply_text("Enabled Karma System for this chat.")
elif state == "disable":
await karma_off(chat_id)
await message.reply_text("Disabled Karma System for this chat.")
else:
await message.reply_text(usage)