MissKatyPyro/misskaty/plugins/admin.py
2022-12-02 21:54:44 +07:00

717 lines
25 KiB
Python

import asyncio, re
from logging import getLogger
from misskaty import app
from misskaty.helper.functions import (
extract_user_and_reason,
time_converter,
extract_user,
int_to_alpha,
)
from time import time
from pyrogram import filters, enums
from pyrogram.errors import FloodWait, ChatAdminRequired
from pyrogram.types import ChatPermissions
from misskaty.core.decorator.permissions import (
adminsOnly,
admins_in_chat,
list_admins,
member_permissions,
)
from misskaty.core.decorator.errors import capture_err
from misskaty.core.keyboard import ikb
from misskaty.vars import SUDO, COMMAND_HANDLER
from database.warn_db import get_warn, remove_warns, add_warn
LOGGER = getLogger(__name__)
__MODULE__ = "Admin"
__HELP__ = """
/ban - Ban A User From A Group
/dban - Delete the replied message banning its sender
/tban - Ban A User For Specific Time
/unban - Unban A User
/listban - Ban a user from groups listed in a message
/listunban - Unban a user from groups listed in a message
/warn - Warn A User
/dwarn - Delete the replied message warning its sender
/rmwarns - Remove All Warning of A User
/warns - Show Warning Of A User
/kick - Kick A User
/dkick - Delete the replied message kicking its sender
/purge - Purge Messages
/purge [n] - Purge "n" number of messages from replied message
/del - Delete Replied Message
/promote - Promote A Member
/fullpromote - Promote A Member With All Rights
/demote - Demote A Member
/pin - Pin A Message
/mute - Mute A User
/tmute - Mute A User For Specific Time
/unmute - Unmute A User
/ban_ghosts - Ban Deleted Accounts
/report | @admins | @admin - Report A Message To Admins.
"""
# Admin cache reload
@app.on_chat_member_updated()
async def admin_cache_func(_, cmu):
if cmu.old_chat_member and cmu.old_chat_member.promoted_by:
admins_in_chat[cmu.chat.id] = {
"last_updated_at": time(),
"data": [
member.user.id
async for member in app.get_chat_members(
cmu.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
],
}
LOGGER.info(f"Updated admin cache for {cmu.chat.id} [{cmu.chat.title}]")
# Purge CMD
@app.on_message(filters.command("purge", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_delete_messages")
async def purge(_, message):
repliedmsg = message.reply_to_message
await message.delete()
if not repliedmsg:
return await message.reply_text("Reply to a message to purge from.")
cmd = message.command
if len(cmd) > 1 and cmd[1].isdigit():
purge_to = repliedmsg.id + int(cmd[1])
if purge_to > message.id:
purge_to = message.id
else:
purge_to = message.id
chat_id = message.chat.id
message_ids = []
del_total = 0
for message_id in range(
repliedmsg.id,
purge_to,
):
message_ids.append(message_id)
# Max message deletion limit is 100
if len(message_ids) == 100:
await app.delete_messages(
chat_id=chat_id,
message_ids=message_ids,
revoke=True, # For both sides
)
del_total += len(message_ids)
# To delete more than 100 messages, start again
message_ids = []
# Delete if any messages left
if len(message_ids) > 0:
await app.delete_messages(
chat_id=chat_id,
message_ids=message_ids,
revoke=True,
)
del_total += len(message_ids)
await message.reply(f"Successfully deleted {del_total} messages..")
# Kick members
@app.on_message(filters.command(["kick", "dkick"], COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def kickFunc(_, message):
user_id, reason = await extract_user_and_reason(message)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == 1507530289:
return await message.reply_text("I can't kick myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("Wow, you wanna kick my owner?")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text("Lol, it's crazy if i can kick an admin.")
mention = (await app.get_users(user_id)).mention
msg = f"""
**Kicked User:** {mention}
**Kicked By:** {message.from_user.mention if message.from_user else 'Anon'}
**Reason:** {reason or '-'}"""
if message.command[0][0] == "d":
await message.reply_to_message.delete()
await message.chat.ban_member(user_id)
await message.reply_text(msg)
await asyncio.sleep(1)
await message.chat.unban_member(user_id)
# Ban/DBan/TBan User
@app.on_message(
filters.command(["ban", "dban", "tban"], COMMAND_HANDLER) & ~filters.private
)
@adminsOnly("can_restrict_members")
async def banFunc(_, message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == 1507530289:
return await message.reply_text("I can't ban myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text(
"I can't ban an admin, You know the rules, so do i."
)
try:
mention = (await app.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Banned User:** {mention}\n"
f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
)
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if message.command[0] == "tban":
split = reason.split(None, 1)
time_value = split[0]
temp_reason = split[1] if len(split) > 1 else ""
temp_ban = await time_converter(message, time_value)
msg += f"**Banned For:** {time_value}\n"
if temp_reason:
msg += f"**Reason:** {temp_reason}"
try:
if len(time_value[:-1]) < 3:
await message.chat.ban_member(user_id, until_date=temp_ban)
await message.reply_text(msg)
else:
await message.reply_text("You can't use more than 99")
except AttributeError:
pass
return
if reason:
msg += f"**Reason:** {reason}"
keyboard = ikb({"🚨 Unban 🚨": f"unban_{user_id}"})
await message.chat.ban_member(user_id)
await message.reply_text(msg, reply_markup=keyboard)
# Unban members
@app.on_message(filters.command("unban", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def unban_func(_, message):
# we don't need reasons for unban, also, we
# don't need to get "text_mention" entity, because
# normal users won't get text_mention if the user
# they want to unban is not in the group.
reply = message.reply_to_message
if reply and reply.sender_chat and reply.sender_chat != message.chat.id:
return await message.reply_text("You cannot unban a channel")
if len(message.command) == 2:
user = message.text.split(None, 1)[1]
elif len(message.command) == 1 and reply:
user = message.reply_to_message.from_user.id
else:
return await message.reply_text(
"Provide a username or reply to a user's message to unban."
)
await message.chat.unban_member(user)
umention = (await app.get_users(user)).mention
await message.reply_text(f"Unbanned! {umention}")
# Ban users listed in a message
@app.on_message(
filters.user(SUDO) & filters.command("listban", COMMAND_HANDLER) & ~filters.private
)
async def list_ban_(c, message):
userid, msglink_reason = await extract_user_and_reason(message)
if not userid or not msglink_reason:
return await message.reply_text(
"Provide a userid/username along with message link and reason to list-ban"
)
if len(msglink_reason.split(" ")) == 1: # message link included with the reason
return await message.reply_text("You must provide a reason to list-ban")
# seperate messge link from reason
lreason = msglink_reason.split()
messagelink, reason = lreason[0], " ".join(lreason[1:])
if not re.search(
r"(https?://)?t(elegram)?\.me/\w+/\d+", messagelink
): # validate link
return await message.reply_text("Invalid message link provided")
if userid == 1507530289:
return await message.reply_text("I can't ban myself.")
if userid in SUDO:
return await message.reply_text("You Wanna Ban The Elevated One?, RECONSIDER!")
splitted = messagelink.split("/")
uname, mid = splitted[-2], int(splitted[-1])
m = await message.reply_text(
"`Banning User from multiple groups. This may take some time`"
)
try:
msgtext = (await app.get_messages(uname, mid)).text
gusernames = re.findall("@\w+", msgtext)
except:
return await m.edit_text("Could not get group usernames")
count = 0
for username in gusernames:
try:
await app.ban_chat_member(username.strip("@"), userid)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.value)
except:
continue
count += 1
mention = (await app.get_users(userid)).mention
msg = f"""
**List-Banned User:** {mention}
**Banned User ID:** `{userid}`
**Admin:** {message.from_user.mention}
**Affected chats:** `{count}`
**Reason:** {reason}
"""
await m.edit_text(msg)
# Unban users listed in a message
@app.on_message(
filters.user(SUDO)
& filters.command("listunban", COMMAND_HANDLER)
& ~filters.private
)
async def list_unban_(c, message):
userid, msglink = await extract_user_and_reason(message)
if not userid or not msglink:
return await message.reply_text(
"Provide a userid/username along with message link to list-unban"
)
if not re.search(r"(https?://)?t(elegram)?\.me/\w+/\d+", msglink): # validate link
return await message.reply_text("Invalid message link provided")
splitted = msglink.split("/")
uname, mid = splitted[-2], int(splitted[-1])
m = await message.reply_text(
"`Unbanning User from multiple groups. \
This may take some time`"
)
try:
msgtext = (await app.get_messages(uname, mid)).text
gusernames = re.findall("@\w+", msgtext)
except:
return await m.edit_text("Could not get the group usernames")
count = 0
for username in gusernames:
try:
await app.unban_chat_member(username.strip("@"), userid)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.x)
except:
continue
count += 1
mention = (await app.get_users(userid)).mention
msg = f"""
**List-Unbanned User:** {mention}
**Unbanned User ID:** `{userid}`
**Admin:** {message.from_user.mention}
**Affected chats:** `{count}`
"""
await m.edit_text(msg)
# Delete messages
@app.on_message(filters.command("del", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_delete_messages")
async def deleteFunc(_, message):
if not message.reply_to_message:
return await message.reply_text("Reply To A Message To Delete It")
await message.reply_to_message.delete()
await message.delete()
# Promote Members
@app.on_message(
filters.command(["promote", "fullpromote"], COMMAND_HANDLER) & ~filters.private
)
@adminsOnly("can_promote_members")
async def promoteFunc(_, message):
user_id = await extract_user(message)
umention = (await app.get_users(user_id)).mention
if not user_id:
return await message.reply_text("I can't find that user.")
bot = await app.get_chat_member(message.chat.id, 1507530289)
if user_id == 1507530289:
return await message.reply_text("I can't promote myself.")
if not bot.can_promote_members:
return await message.reply_text("I don't have enough permissions")
if message.command[0][0] == "f":
await message.chat.promote_member(
user_id=user_id,
can_change_info=bot.can_change_info,
can_invite_users=bot.can_invite_users,
can_delete_messages=bot.can_delete_messages,
can_restrict_members=bot.can_restrict_members,
can_pin_messages=bot.can_pin_messages,
can_promote_members=bot.can_promote_members,
can_manage_chat=bot.can_manage_chat,
can_manage_voice_chats=bot.can_manage_voice_chats,
)
return await message.reply_text(f"Fully Promoted! {umention}")
await message.chat.promote_member(
user_id=user_id,
can_change_info=False,
can_invite_users=bot.can_invite_users,
can_delete_messages=bot.can_delete_messages,
can_restrict_members=False,
can_pin_messages=False,
can_promote_members=False,
can_manage_chat=bot.can_manage_chat,
can_manage_voice_chats=bot.can_manage_voice_chats,
)
await message.reply_text(f"Promoted! {umention}")
# Demote Member
@app.on_message(filters.command("demote", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_promote_members")
async def demote(_, message):
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == 1507530289:
return await message.reply_text("I can't demote myself.")
if user_id in SUDO:
return await message.reply_text("You wanna demote the elevated one?")
await message.chat.promote_member(
user_id=user_id,
can_change_info=False,
can_invite_users=False,
can_delete_messages=False,
can_restrict_members=False,
can_pin_messages=False,
can_promote_members=False,
can_manage_chat=False,
can_manage_voice_chats=False,
)
umention = (await app.get_users(user_id)).mention
await message.reply_text(f"Demoted! {umention}")
# Pin Messages
@app.on_message(filters.command(["pin", "unpin"], COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_pin_messages")
async def pin(_, message):
if not message.reply_to_message:
return await message.reply_text("Reply to a message to pin/unpin it.")
r = message.reply_to_message
try:
if message.command[0][0] == "u":
await r.unpin()
return await message.reply_text(
f"**Unpinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
await r.pin(disable_notification=True)
await message.reply(
f"**Pinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
except ChatAdminRequired:
await message.reply(
"Please give me admin access to use this command.",
disable_web_page_preview=True,
)
# Mute members
@app.on_message(filters.command(["mute", "tmute"], COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def mute(_, message):
user_id, reason = await extract_user_and_reason(message)
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == 1507530289:
return await message.reply_text("I can't mute myself.")
if user_id in SUDO:
return await message.reply_text("You wanna mute the elevated one?, RECONSIDER!")
if user_id in (await list_admins(message.chat.id)):
return await message.reply_text(
"I can't mute an admin, You know the rules, so do i."
)
mention = (await app.get_users(user_id)).mention
keyboard = ikb({"🚨 Unmute 🚨": f"unmute_{user_id}"})
msg = (
f"**Muted User:** {mention}\n"
f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
)
if message.command[0] == "tmute":
split = reason.split(None, 1)
time_value = split[0]
temp_reason = split[1] if len(split) > 1 else ""
temp_mute = await time_converter(message, time_value)
msg += f"**Muted For:** {time_value}\n"
if temp_reason:
msg += f"**Reason:** {temp_reason}"
try:
if len(time_value[:-1]) < 3:
await message.chat.restrict_member(
user_id,
permissions=ChatPermissions(),
until_date=temp_mute,
)
await message.reply_text(msg, reply_markup=keyboard)
else:
await message.reply_text("You can't use more than 99")
except AttributeError:
pass
return
if reason:
msg += f"**Reason:** {reason}"
await message.chat.restrict_member(user_id, permissions=ChatPermissions())
await message.reply_text(msg, reply_markup=keyboard)
# Unmute members
@app.on_message(filters.command("unmute", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def unmute(_, message):
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
await message.chat.unban_member(user_id)
umention = (await app.get_users(user_id)).mention
await message.reply_text(f"Unmuted! {umention}")
# Ban deleted accounts
@app.on_message(filters.command("ban_ghosts", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def ban_deleted_accounts(_, message):
chat_id = message.chat.id
deleted_users = []
m = await message.reply("Finding ghosts...")
async for i in app.iter_chat_members(chat_id):
if i.user.is_deleted:
deleted_users.append(i.user.id)
if deleted_users:
banned_users = 0
for deleted_user in deleted_users:
try:
await message.chat.ban_member(deleted_user)
except Exception:
pass
banned_users += 1
await m.edit(f"Banned {banned_users} Deleted Accounts")
else:
await m.edit("There are no deleted accounts in this chat")
@app.on_message(filters.command(["warn", "dwarn"], COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def warn_user(_, message):
user_id, reason = await extract_user_and_reason(message)
chat_id = message.chat.id
if not user_id:
return await message.reply_text("I can't find that user.")
if user_id == 1507530289:
return await message.reply_text("I can't warn myself, i can leave if you want.")
if user_id in SUDO:
return await message.reply_text("You Wanna Warn The Elevated One?, RECONSIDER!")
if user_id in (await list_admins(chat_id)):
return await message.reply_text(
"I can't warn an admin, You know the rules, so do i."
)
user, warns = await asyncio.gather(
app.get_users(user_id),
get_warn(chat_id, await int_to_alpha(user_id)),
)
mention = user.mention
keyboard = ikb({"🚨 Remove Warn 🚨": f"unwarn_{user_id}"})
warns = warns["warns"] if warns else 0
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if warns >= 2:
await message.chat.ban_member(user_id)
await message.reply_text(f"Number of warns of {mention} exceeded, BANNED!")
await remove_warns(chat_id, await int_to_alpha(user_id))
else:
warn = {"warns": warns + 1}
msg = f"""
**Warned User:** {mention}
**Warned By:** {message.from_user.mention if message.from_user else 'Anon'}
**Reason:** {reason or 'No Reason Provided.'}
**Warns:** {warns + 1}/3"""
await message.reply_text(msg, reply_markup=keyboard)
await add_warn(chat_id, await int_to_alpha(user_id), warn)
@app.on_callback_query(filters.regex("unwarn_"))
async def remove_warning(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n"
+ f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
warns = await get_warn(chat_id, await int_to_alpha(user_id))
if warns:
warns = warns["warns"]
if not warns or warns == 0:
return await cq.answer("User has no warnings.")
warn = {"warns": warns - 1}
await add_warn(chat_id, await int_to_alpha(user_id), warn)
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Warn removed by {from_user.mention}__"
await cq.message.edit(text)
@app.on_callback_query(filters.regex("unmute_"))
async def unmute_user(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n"
+ f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Mute removed by {from_user.mention}__"
await cq.message.chat.unban_member(user_id)
await cq.message.edit(text)
@app.on_callback_query(filters.regex("unban_"))
async def unban_user(_, cq):
from_user = cq.from_user
chat_id = cq.message.chat.id
permissions = await member_permissions(chat_id, from_user.id)
permission = "can_restrict_members"
if permission not in permissions:
return await cq.answer(
"You don't have enough permissions to perform this action.\n"
+ f"Permission needed: {permission}",
show_alert=True,
)
user_id = cq.data.split("_")[1]
(await app.get_users(user_id)).mention
text = cq.message.text.markdown
text = f"~~{text}~~\n\n"
text += f"__Banned removed by {from_user.mention}__"
await cq.message.chat.unban_member(user_id)
await cq.message.edit(text)
# Remove Warn
@app.on_message(filters.command("rmwarn", COMMAND_HANDLER) & ~filters.private)
@adminsOnly("can_restrict_members")
async def remove_warnings(_, message):
if not message.reply_to_message:
return await message.reply_text(
"Reply to a message to remove a user's warnings."
)
user_id = message.reply_to_message.from_user.id
mention = message.reply_to_message.from_user.mention
chat_id = message.chat.id
warns = await get_warn(chat_id, await int_to_alpha(user_id))
if warns:
warns = warns["warns"]
if warns == 0 or not warns:
await message.reply_text(f"{mention} have no warnings.")
else:
await remove_warns(chat_id, await int_to_alpha(user_id))
await message.reply_text(f"Removed warnings of {mention}.")
# Warns
@app.on_message(filters.command("warns", COMMAND_HANDLER) & ~filters.private)
@capture_err
async def check_warns(_, message):
user_id = await extract_user(message)
if not user_id:
return await message.reply_text("I can't find that user.")
warns = await get_warn(message.chat.id, await int_to_alpha(user_id))
mention = (await app.get_users(user_id)).mention
if warns:
warns = warns["warns"]
else:
return await message.reply_text(f"{mention} has no warnings.")
return await message.reply_text(f"{mention} has {warns}/3 warnings.")
# Report User in Group
@app.on_message(
(
filters.command("report", COMMAND_HANDLER)
| filters.command(["admins", "admin"], prefixes="@")
)
& ~filters.private
)
@capture_err
async def report_user(_, message):
if not message.reply_to_message:
return await message.reply_text("Reply to a message to report that user.")
reply = message.reply_to_message
reply_id = reply.from_user.id if reply.from_user else reply.sender_chat.id
user_id = message.from_user.id if message.from_user else message.sender_chat.id
if reply_id == user_id:
return await message.reply_text("Why are you reporting yourself ?")
list_of_admins = await list_admins(message.chat.id)
linked_chat = (await app.get_chat(message.chat.id)).linked_chat
if linked_chat is None:
if reply_id in list_of_admins or reply_id == message.chat.id:
return await message.reply_text(
"Do you know that the user you are replying is an admin ?"
)
elif (
reply_id in list_of_admins
or reply_id == message.chat.id
or reply_id == linked_chat.id
):
return await message.reply_text(
"Do you know that the user you are replying is an admin ?"
)
user_mention = (
reply.from_user.mention if reply.from_user else reply.sender_chat.title
)
text = f"Reported {user_mention} to admins!"
admin_data = [
m
async for m in app.get_chat_members(
message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
]
for admin in admin_data:
if admin.user.is_bot or admin.user.is_deleted:
# return bots or deleted admins
continue
text += f"[\u2063](tg://user?id={admin.user.id})"
await message.reply_to_message.reply_text(text)