MissKatyPyro/misskaty/plugins/admin.py

699 lines
26 KiB
Python

import asyncio
import re
from logging import getLogger
from time import time
from pyrogram import enums, filters
from pyrogram.errors import ChatAdminRequired, FloodWait
from pyrogram.types import ChatPermissions
from database.warn_db import add_warn, get_warn, remove_warns
from misskaty import app
from misskaty.core.decorator.errors import capture_err
from misskaty.core.decorator.permissions import (
admins_in_chat,
adminsOnly,
list_admins,
member_permissions,
)
from misskaty.core.decorator.ratelimiter import ratelimiter
from misskaty.core.keyboard import ikb
from misskaty.core.message_utils import kirimPesan
from misskaty.helper.functions import (
extract_user,
extract_user_and_reason,
int_to_alpha,
time_converter,
)
from misskaty.vars import COMMAND_HANDLER, SUDO
LOGGER = getLogger(__name__)
__MODULE__ = "Admin"
__HELP__ = """
/ban - Ban A User From A Group
/dban - Delete the replied message banning its sender
/tban - Ban A User For Specific Time
/unban - Unban A User
/listban - Ban a user from groups listed in a message
/listunban - Unban a user from groups listed in a message
/warn - Warn A User
/dwarn - Delete the replied message warning its sender
/rmwarns - Remove All Warning of A User
/warns - Show Warning Of A User
/kick - Kick A User
/dkick - Delete the replied message kicking its sender
/purge - Purge Messages
/purge [n] - Purge "n" number of messages from replied message
/del - Delete Replied Message
/promote - Promote A Member
/fullpromote - Promote A Member With All Rights
/demote - Demote A Member
/pin - Pin A Message
/mute - Mute A User
/tmute - Mute A User For Specific Time
/unmute - Unmute A User
/ban_ghosts - Ban Deleted Accounts
/report | @admins | @admin - Report A Message To Admins.
"""
# Admin cache reload
@app.on_chat_member_updated()
async def admin_cache_func(_, cmu):
if cmu.old_chat_member and cmu.old_chat_member.promoted_by:
try:
admins_in_chat[cmu.chat.id] = {
"last_updated_at": time(),
"data": [member.user.id async for member in app.get_chat_members(cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)],
}
LOGGER.info(f"Updated admin cache for {cmu.chat.id} [{cmu.chat.title}]")
except:
pass
# Purge CMD
@app.on_message(filters.command("purge", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_delete_messages")
@ratelimiter
async def purge(_, message):
if not message.from_user:
return
try:
repliedmsg = message.reply_to_message
await message.delete()
if not repliedmsg:
return await message.reply_text("Reply to a message to purge from.")
cmd = message.command
if len(cmd) > 1 and cmd[1].isdigit():
purge_to = repliedmsg.id + int(cmd[1])
purge_to = min(purge_to, message.id)
else:
purge_to = message.id
chat_id = message.chat.id
message_ids = []
del_total = 0
for message_id in range(
repliedmsg.id,
purge_to,
):
message_ids.append(message_id)
# Max message deletion limit is 100
if len(message_ids) == 100:
await app.delete_messages(
chat_id=chat_id,
message_ids=message_ids,
revoke=True, # For both sides
)
del_total += len(message_ids)
# To delete more than 100 messages, start again
message_ids = []
# Delete if any messages left
if len(message_ids) > 0:
await app.delete_messages(
chat_id=chat_id,
message_ids=message_ids,
revoke=True,
)
del_total += len(message_ids)
await message.reply(f"Successfully deleted {del_total} messages..")
except Exception as err:
await message.reply(f"ERR: {err}")
# Kick members
@app.on_message(filters.command(["kick", "dkick"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def kickFunc(client, message):
if not message.from_user:
return
user_id, reason = await extract_user_and_reason(message)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == client.me.id:
return await message.reply_text("I can't kick myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("Wow, you wanna kick my owner?")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text("Lol, it's crazy if i can kick an admin.")
mention = (await app.get_users(user_id)).mention
msg = f"""
**Kicked User:** {mention}
**Kicked By:** {message.from_user.mention if message.from_user else 'Anon'}
**Reason:** {reason or '-'}"""
if message.command[0][0] == "d":
await message.reply_to_message.delete()
try:
await message.chat.ban_member(user_id)
await message.reply_text(msg)
await asyncio.sleep(1)
await message.chat.unban_member(user_id)
except ChatAdminRequired:
await message.reply("Please give me ban permission to ban user in this group.")
# Ban/DBan/TBan User
@app.on_message(filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def banFunc(client, message):
if not message.from_user:
return
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == client.me.id:
return await message.reply_text("I can't ban myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text("I can't ban an admin, You know the rules, so do i.")
try:
mention = (await app.get_users(user_id)).mention
except IndexError:
mention = message.reply_to_message.sender_chat.title if message.reply_to_message else "Anon"
msg = f"**Banned User:** {mention}\n" f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if message.command[0] == "tban":
split = reason.split(None, 1)
time_value = split[0]
temp_reason = split[1] if len(split) > 1 else ""
temp_ban = await time_converter(message, time_value)
msg += f"**Banned For:** {time_value}\n"
if temp_reason:
msg += f"**Reason:** {temp_reason}"
try:
if len(time_value[:-1]) < 3:
await message.chat.ban_member(user_id, until_date=temp_ban)
await message.reply_text(msg)
else:
await message.reply_text("You can't use more than 99")
except AttributeError:
pass
return
if reason:
msg += f"**Reason:** {reason}"
keyboard = ikb({"🚨 Unban 🚨": f"unban_{user_id}"})
try:
await message.chat.ban_member(user_id)
await message.reply_text(msg, reply_markup=keyboard)
except Exception as err:
await message.reply(f"ERROR: {err}")
# Unban members
@app.on_message(filters.command("unban", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def unban_func(_, message):
if not message.from_user:
return
# we don't need reasons for unban, also, we
# don't need to get "text_mention" entity, because
# normal users won't get text_mention if the user
# they want to unban is not in the group.
reply = message.reply_to_message
if reply and reply.sender_chat and reply.sender_chat != message.chat.id:
return await message.reply_text("You cannot unban a channel")
if len(message.command) == 2:
user = message.text.split(None, 1)[1]
elif len(message.command) == 1 and reply:
user = message.reply_to_message.from_user.id
else:
return await message.reply_text("Provide a username or reply to a user's message to unban.")
await message.chat.unban_member(user)
umention = (await app.get_users(user)).mention
await message.reply_text(f"Unbanned! {umention}")
# Ban users listed in a message
@app.on_message(filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & filters.group)
@ratelimiter
async def list_ban_(c, message):
if not message.from_user:
return
userid, msglink_reason = await extract_user_and_reason(message)
if not userid or not msglink_reason:
return await message.reply_text("Provide a userid/username along with message link and reason to list-ban")
if len(msglink_reason.split(" ")) == 1: # message link included with the reason
return await message.reply_text("You must provide a reason to list-ban")
# seperate messge link from reason
lreason = msglink_reason.split()
messagelink, reason = lreason[0], " ".join(lreason[1:])
if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink): # validate link
return await message.reply_text("Invalid message link provided")
if userid == c.me.id:
return await message.reply_text("I can't ban myself.")
if userid in SUDO:
return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!")
splitted = messagelink.split("/")
uname, mid = splitted[-2], int(splitted[-1])
m = await message.reply_text("`Banning User from multiple groups. This may take some time`")
try:
msgtext = (await app.get_messages(uname, mid)).text
gusernames = re.findall(r"@\w+", msgtext)
except:
return await m.edit_text("Could not get group usernames")
count = 0
for username in gusernames:
try:
await app.ban_chat_member(username.strip("@"), userid)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.value)
except:
continue
count += 1
mention = (await app.get_users(userid)).mention
msg = f"""
**List-Banned User:** {mention}
**Banned User ID:** `{userid}`
**Admin:** {message.from_user.mention}
**Affected chats:** `{count}`
**Reason:** {reason}
"""
await m.edit_text(msg)
# Unban users listed in a message
@app.on_message(filters.user(SUDO) & filters.command("listunban", COMMAND_HANDLER) & filters.group)
@ratelimiter
async def list_unban_(c, message):
if not message.from_user:
return
userid, msglink = await extract_user_and_reason(message)
if not userid or not msglink:
return await message.reply_text("Provide a userid/username along with message link to list-unban")
if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", msglink): # validate link
return await message.reply_text("Invalid message link provided")
splitted = msglink.split("/")
uname, mid = splitted[-2], int(splitted[-1])
m = await message.reply_text(
"`Unbanning User from multiple groups. \
This may take some time`"
)
try:
msgtext = (await app.get_messages(uname, mid)).text
gusernames = re.findall(r"@\w+", msgtext)
except:
return await m.edit_text("Could not get the group usernames")
count = 0
for username in gusernames:
try:
await app.unban_chat_member(username.strip("@"), userid)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.x)
except:
continue
count += 1
mention = (await app.get_users(userid)).mention
msg = f"""
**List-Unbanned User:** {mention}
**Unbanned User ID:** `{userid}`
**Admin:** {message.from_user.mention}
**Affected chats:** `{count}`
"""
await m.edit_text(msg)
# Delete messages
@app.on_message(filters.command("del", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_delete_messages")
@ratelimiter
async def deleteFunc(_, message):
if not message.from_user:
return
if not message.reply_to_message:
return await message.reply_text("Reply To A Message To Delete It")
try:
await message.reply_to_message.delete()
await message.delete()
except:
await message.reply("Please give me delete message permission.")
# Promote Members
@app.on_message(filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_promote_members")
@ratelimiter
async def promoteFunc(client, message):
if not message.from_user:
return
try:
user_id = await extract_user(message)
umention = (await app.get_users(user_id)).mention
except:
return await message.reply("⚠️ Invalid userid/username")
if not user_id:
return await message.reply_text("I can't find that user.")
bot = await app.get_chat_member(message.chat.id, client.me.id)
if user_id == client.me.id:
return await message.reply_text("I can't promote myself.")
if not bot.privileges.can_promote_members:
return await message.reply_text("I don't have enough permissions")
if message.command[0][0] == "f":
await message.chat.promote_member(
user_id=user_id,
can_change_info=bot.can_change_info,
can_invite_users=bot.can_invite_users,
can_delete_messages=bot.can_delete_messages,
can_restrict_members=bot.can_restrict_members,
can_pin_messages=bot.can_pin_messages,
can_promote_members=bot.can_promote_members,
can_manage_chat=bot.can_manage_chat,
can_manage_voice_chats=bot.can_manage_voice_chats,
)
return await message.reply_text(f"Fully Promoted! {umention}")
await message.chat.promote_member(
user_id=user_id,
can_change_info=False,
can_invite_users=bot.can_invite_users,
can_delete_messages=bot.can_delete_messages,
can_restrict_members=False,
can_pin_messages=False,
can_promote_members=False,
can_manage_chat=bot.can_manage_chat,
can_manage_voice_chats=bot.can_manage_voice_chats,
)
await message.reply_text(f"Promoted! {umention}")
# Demote Member
@app.on_message(filters.command("demote", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_promote_members")
@ratelimiter
async def demote(client, message):
if not message.from_user:
return
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == client.me.id:
return await message.reply_text("I can't demote myself.")
if user_id in SUDO:
return await message.reply_text("You wanna demote the elevated one?")
await message.chat.promote_member(
user_id=user_id,
can_change_info=False,
can_invite_users=False,
can_delete_messages=False,
can_restrict_members=False,
can_pin_messages=False,
can_promote_members=False,
can_manage_chat=False,
can_manage_voice_chats=False,
)
umention = (await app.get_users(user_id)).mention
await message.reply_text(f"Demoted! {umention}")
# Pin Messages
@app.on_message(filters.command(["pin", "unpin"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_pin_messages")
@ratelimiter
async def pin(_, message):
if not message.from_user:
return
if not message.reply_to_message:
return await message.reply_text("Reply to a message to pin/unpin it.")
r = message.reply_to_message
try:
if message.command[0][0] == "u":
await r.unpin()
return await message.reply_text(
f"**Unpinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
await r.pin(disable_notification=True)
await message.reply(
f"**Pinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
except ChatAdminRequired:
await message.reply(
"Please give me admin access to use this command.",
disable_web_page_preview=True,
)
# Mute members
@app.on_message(filters.command(["mute", "tmute"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def mute(client, message):
if not message.from_user:
return
try:
user_id, reason = await extract_user_and_reason(message)
except Exception as err:
return await message.reply(f"ERROR: {err}")
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == client.me.id:
return await message.reply_text("I can't mute myself.")
if user_id in SUDO:
return await message.reply_text("You wanna mute the elevated one?, RECONSIDER!")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text("I can't mute an admin, You know the rules, so do i.")
mention = (await app.get_users(user_id)).mention
keyboard = ikb({"🚨 Unmute 🚨": f"unmute_{user_id}"})
msg = f"**Muted User:** {mention}\n" f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
if message.command[0] == "tmute":
split = reason.split(None, 1)
time_value = split[0]
temp_reason = split[1] if len(split) > 1 else ""
temp_mute = await time_converter(message, time_value)
msg += f"**Muted For:** {time_value}\n"
if temp_reason:
msg += f"**Reason:** {temp_reason}"
try:
if len(time_value[:-1]) < 3:
await message.chat.restrict_member(
user_id,
permissions=ChatPermissions(),
until_date=temp_mute,
)
await message.reply_text(msg, reply_markup=keyboard)
else:
await message.reply_text("You can't use more than 99")
except AttributeError:
pass
return
if reason:
msg += f"**Reason:** {reason}"
await message.chat.restrict_member(user_id, permissions=ChatPermissions())
await message.reply_text(msg, reply_markup=keyboard)
# Unmute members
@app.on_message(filters.command("unmute", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def unmute(_, message):
if not message.from_user:
return
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
await message.chat.unban_member(user_id)
umention = (await app.get_users(user_id)).mention
await message.reply_text(f"Unmuted! {umention}")
@app.on_message(filters.command(["warn", "dwarn"], COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def warn_user(client, message):
if not message.from_user:
return
user_id, reason = await extract_user_and_reason(message)
chat_id = message.chat.id
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == client.me.id:
return await message.reply_text("I can't warn myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("You Wanna Warn The Elevated One?, RECONSIDER!")
if user_id in (await list_admins(chat_id)):
return await message.reply_text("I can't warn an admin, You know the rules, so do i.")
user, warns = await asyncio.gather(
app.get_users(user_id),
get_warn(chat_id, await int_to_alpha(user_id)),
)
mention = user.mention
keyboard = ikb({"🚨 Remove Warn 🚨": f"unwarn_{user_id}"})
warns = warns["warns"] if warns else 0
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if warns >= 2:
await message.chat.ban_member(user_id)
await message.reply_text(f"Number of warns of {mention} exceeded, BANNED!")
await remove_warns(chat_id, await int_to_alpha(user_id))
else:
warn = {"warns": warns + 1}
msg = f"""
**Warned User:** {mention}
**Warned By:** {message.from_user.mention if message.from_user else 'Anon'}
**Reason:** {reason or 'No Reason Provided.'}
**Warns:** {warns + 1}/3"""
await message.reply_text(msg, reply_markup=keyboard)
await add_warn(chat_id, await int_to_alpha(user_id), warn)
@app.on_callback_query(filters.regex("unwarn_"))
@ratelimiter
async def remove_warning(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
warns = await get_warn(chat_id, await int_to_alpha(user_id))
if warns:
warns = warns["warns"]
if not warns or warns == 0:
return await cq.answer("User has no warnings.")
warn = {"warns": warns - 1}
await add_warn(chat_id, await int_to_alpha(user_id), warn)
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Warn removed by {from_user.mention}__"
await cq.message.edit(text)
@app.on_callback_query(filters.regex("unmute_"))
@ratelimiter
async def unmute_user(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Mute removed by {from_user.mention}__"
await cq.message.chat.unban_member(user_id)
await cq.message.edit(text)
@app.on_callback_query(filters.regex("unban_"))
@ratelimiter
async def unban_user(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n" + f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
(await app.get_users(user_id)).mention
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Banned removed by {from_user.mention}__"
await cq.message.chat.unban_member(user_id)
await cq.message.edit(text)
# Remove Warn
@app.on_message(filters.command("rmwarn", COMMAND_HANDLER) & filters.group)
@adminsOnly("can_restrict_members")
@ratelimiter
async def remove_warnings(_, message):
if not message.from_user:
return
if not message.reply_to_message:
return await message.reply_text("Reply to a message to remove a user's warnings.")
user_id = message.reply_to_message.from_user.id
mention = message.reply_to_message.from_user.mention
chat_id = message.chat.id
warns = await get_warn(chat_id, await int_to_alpha(user_id))
if warns:
warns = warns["warns"]
if warns == 0 or not warns:
await message.reply_text(f"{mention} have no warnings.")
else:
await remove_warns(chat_id, await int_to_alpha(user_id))
await message.reply_text(f"Removed warnings of {mention}.")
# Warns
@app.on_message(filters.command("warns", COMMAND_HANDLER) & filters.group)
@capture_err
@ratelimiter
async def check_warns(_, message):
if not message.from_user:
return
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
warns = await get_warn(message.chat.id, await int_to_alpha(user_id))
mention = (await app.get_users(user_id)).mention
if warns:
warns = warns["warns"]
else:
return await message.reply_text(f"{mention} has no warnings.")
return await message.reply_text(f"{mention} has {warns}/3 warnings.")
# Report User in Group
@app.on_message((filters.command("report", COMMAND_HANDLER) | filters.command(["admins", "admin"], prefixes="@")) & filters.group)
@capture_err
@ratelimiter
async def report_user(_, message):
if not message.reply_to_message:
return await message.reply_text("Reply to a message to report that user.")
reply = message.reply_to_message
reply_id = reply.from_user.id if reply.from_user else reply.sender_chat.id
user_id = message.from_user.id if message.from_user else message.sender_chat.id
if reply_id == user_id:
return await message.reply_text("Why are you reporting yourself ?")
list_of_admins = await list_admins(message.chat.id)
linked_chat = (await app.get_chat(message.chat.id)).linked_chat
if linked_chat is None:
if reply_id in list_of_admins or reply_id == message.chat.id:
return await message.reply_text("Do you know that the user you are replying is an admin ?")
elif reply_id in list_of_admins or reply_id == message.chat.id or reply_id == linked_chat.id:
return await message.reply_text("Do you know that the user you are replying is an admin ?")
user_mention = reply.from_user.mention if reply.from_user else reply.sender_chat.title
text = f"Reported {user_mention} to admins!"
admin_data = [m async for m in app.get_chat_members(message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS)]
for admin in admin_data:
if admin.user.is_bot or admin.user.is_deleted:
# return bots or deleted admins
continue
text += f"<a href='tg://user?id={admin.user.id}>\u2063</a>"
await kirimPesan(message.reply_to_message, text)