import re from pyrogram import filters from database.karma_db import ( get_karma, get_karmas, is_karma_on, karma_off, karma_on, update_karma, ) from misskaty import app from misskaty.core.decorator.errors import capture_err from misskaty.core.decorator.permissions import adminsOnly from misskaty.helper.functions import alpha_to_int, int_to_alpha __MODULE__ = "Karma" __HELP__ = """ Give reputation to other people in group. /karma_toggle [enable/disable] - Enable/Disable Karma. /karma - View all karma from member group. """ karma_positive_group = 3 karma_negative_group = 4 regex_upvote = r"^(\++|\+1|thx||tnx|ty|tq|thank you|thanx|thanks|pro|cool|good|agree|makasih|👍|\+\+ .+)$" regex_downvote = r"^(-+|-1|not cool|disagree|worst|bad|👎|-- .+)$" n = "\n" w = " " bold = lambda x: f"**{x}:** " bold_ul = lambda x: f"**--{x}:**-- " mono = lambda x: f"`{x}`{n}" def section( title: str, body: dict, indent: int = 2, underline: bool = False, ) -> str: text = (bold_ul(title) + n) if underline else bold(title) + n for key, value in body.items(): text += ( indent * w + bold(key) + ((value[0] + n) if isinstance(value, list) else mono(value)) ) return text async def get_user_id_and_usernames(client) -> dict: with client.storage.lock, client.storage.conn: users = client.storage.conn.execute( 'SELECT * FROM peers WHERE type in ("user", "bot") AND username NOT null' ).fetchall() return {user[0]: user[3] for user in users} @app.on_message( filters.text & filters.group & filters.incoming & filters.reply & filters.regex(regex_upvote, re.IGNORECASE) & ~filters.via_bot & ~filters.bot, group=karma_positive_group, ) @capture_err async def upvote(_, message): if not await is_karma_on(message.chat.id): return if not message.reply_to_message.from_user: return if not message.from_user: return if message.reply_to_message.from_user.is_bot: return if message.reply_to_message.from_user.id == message.from_user.id: return chat_id = message.chat.id user_id = message.reply_to_message.from_user.id user_mention = message.reply_to_message.from_user.mention current_karma = await get_karma(chat_id, await int_to_alpha(user_id)) if current_karma: current_karma = current_karma["karma"] karma = current_karma + 1 else: karma = 1 new_karma = {"karma": karma} await update_karma(chat_id, await int_to_alpha(user_id), new_karma) await message.reply_msg( f"Incremented Karma of {user_mention} By 1 \nTotal Points: {karma}" ) @app.on_message( filters.text & filters.group & filters.incoming & filters.reply & filters.regex(regex_downvote, re.IGNORECASE) & ~filters.via_bot & ~filters.bot, group=karma_negative_group, ) @capture_err async def downvote(_, message): if not await is_karma_on(message.chat.id): return if not message.reply_to_message.from_user: return if not message.from_user: return if message.reply_to_message.from_user.is_bot: return if message.reply_to_message.from_user.id == message.from_user.id: return chat_id = message.chat.id user_id = message.from_user.id current_karma = await get_karma(chat_id, await int_to_alpha(user_id)) if current_karma: current_karma = current_karma["karma"] karma = current_karma - 1 else: karma = 1 new_karma = {"karma": karma} await update_karma(chat_id, await int_to_alpha(user_id), new_karma) user_id = message.reply_to_message.from_user.id user_mention = message.reply_to_message.from_user.mention current_karma = await get_karma(chat_id, await int_to_alpha(user_id)) if current_karma: current_karma = current_karma["karma"] karma = current_karma - 1 else: karma = 1 new_karma = {"karma": karma} await update_karma(chat_id, await int_to_alpha(user_id), new_karma) await message.reply_msg( f"Decremented Karma of {user_mention} By 1 \nTotal Points: {karma}" ) @app.on_message(filters.command("karma") & filters.group) @capture_err async def command_karma(_, message): chat_id = message.chat.id if not message.reply_to_message: m = await message.reply_msg("Analyzing Karma...") karma = await get_karmas(chat_id) if not karma: return await m.edit("No karma in DB for this chat.") msg = f"Karma list of {message.chat.title}" limit = 0 karma_dicc = {} for i in karma: user_id = await alpha_to_int(i) user_karma = karma[i]["karma"] karma_dicc[str(user_id)] = user_karma karma_arranged = dict( sorted( karma_dicc.items(), key=lambda item: item[1], reverse=True, ) ) if not karma_dicc: return await m.edit("No karma in DB for this chat.") try: userdb = await get_user_id_and_usernames(app) except (AttributeError, TypeError): return karma = {} for user_idd, karma_count in karma_arranged.items(): if limit > 15: break if int(user_idd) not in list(userdb.keys()): continue username = userdb[int(user_idd)] karma[f"@{username}"] = [f"**{str(karma_count)}**"] limit += 1 await m.edit(section(msg, karma)) else: if not message.reply_to_message.from_user: return await message.reply("Anon user has no karma.") user_id = message.reply_to_message.from_user.id karma = await get_karma(chat_id, await int_to_alpha(user_id)) karma = karma["karma"] if karma else 0 await message.reply_text(f"**Total Points**: __{karma}__") @app.on_message(filters.command("karma_toggle") & ~filters.private) @adminsOnly("can_change_info") async def captcha_state(_, message): usage = "**Usage:**\n/karma_toggle [ENABLE|DISABLE]" if len(message.command) != 2: return await message.reply_text(usage) chat_id = message.chat.id state = message.text.split(None, 1)[1].strip() state = state.lower() if state == "enable": await karma_on(chat_id) await message.reply_text("Enabled Karma System for this chat.") elif state == "disable": await karma_off(chat_id) await message.reply_text("Disabled Karma System for this chat.") else: await message.reply_text(usage)