diff --git a/database/gban_db.py b/database/gban_db.py new file mode 100644 index 00000000..9ef51e01 --- /dev/null +++ b/database/gban_db.py @@ -0,0 +1,21 @@ +from database import dbname + +gbansdb = dbname.gban + +async def is_gbanned_user(user_id: int) -> bool: + user = await gbansdb.find_one({"user_id": user_id}) + if not user: + return False + return True + +async def add_gban_user(user_id: int): + is_gbanned = await is_gbanned_user(user_id) + if is_gbanned: + return + return await gbansdb.insert_one({"user_id": user_id}) + +async def remove_gban_user(user_id: int): + is_gbanned = await is_gbanned_user(user_id) + if not is_gbanned: + return + return await gbansdb.delete_one({"user_id": user_id}) \ No newline at end of file diff --git a/misskaty/helper/human_read.py b/misskaty/helper/human_read.py index ad2925ee..0a7ddb44 100644 --- a/misskaty/helper/human_read.py +++ b/misskaty/helper/human_read.py @@ -19,17 +19,17 @@ def get_readable_time(seconds: int) -> str: (days, remainder) = divmod(seconds, 86400) days = int(days) if days != 0: - result += f"{days} day " + result += f"{days} d " (hours, remainder) = divmod(remainder, 3600) hours = int(hours) if hours != 0: - result += f"{hours} hour " + result += f"{hours} h " (minutes, seconds) = divmod(remainder, 60) minutes = int(minutes) if minutes != 0: - result += f"{minutes} minute " + result += f"{minutes} m " seconds = int(seconds) - result += f"{seconds} second " + result += f"{seconds} s " return result @@ -46,7 +46,7 @@ def get_readable_time2(seconds: int) -> str: count = 0 ping_time = "" time_list = [] - time_suffix_list = ["second", "minute", "hour", "day", "week", "month", "year"] + time_suffix_list = ["s", "m", "h", "d", "w", "m", "y"] while count < 4: count += 1 remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24) diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py index d4d00373..bd61ea48 100644 --- a/misskaty/plugins/dev.py +++ b/misskaty/plugins/dev.py @@ -16,6 +16,7 @@ from time import time from database.users_chats_db import db from inspect import getfullargspec from typing import Any, Optional, Tuple +from database.gban_db import is_gbanned_user, add_gban_user, remove_gban_user from psutil import cpu_percent, Process from psutil import disk_usage as disk_usage_percent @@ -31,6 +32,10 @@ from misskaty import app, user, botStartTime, misskaty_version, BOT_NAME from misskaty.helper.http import http from misskaty.helper.eval_helper import meval, format_exception from misskaty.helper.localization import use_chat_lang +from misskaty.helper.functions import ( + extract_user, + extract_user_and_reason +) from misskaty.helper.human_read import get_readable_file_size, get_readable_time from misskaty.vars import COMMAND_HANDLER, SUDO, LOG_CHANNEL @@ -45,6 +50,8 @@ __HELP__ = """ /enablechat [chat id] - Add Blacklist group /banuser [chat id] - Ban user and block user so cannot use bot /unbanuser [chat id] - Unban user and make their can use bot again +/gban - To Ban A User Globally. +/ungban - To remove ban user globbaly. **For Public Use** /stats - Check statistic bot @@ -200,6 +207,85 @@ async def server_stats(self: Client, ctx: Message) -> "Message": os.remove("stats.png") +# Gban +@app.on_message(filters.command("gban", COMMAND_HANDLER) & filters.user(SUDO)) +async def ban_globally(self: Client, ctx: Message): + user_id, reason = await extract_user_and_reason(ctx) + user = await app.get_users(user_id) + from_user = ctx.from_user + + if not user_id: + return await ctx.reply_text("I can't find that user.") + if not reason: + return await ctx.reply("No reason provided.") + + if user_id in [from_user.id, self.me.id] or user_id in SUDO: + return await ctx.reply_text("I can't ban that user.") + served_chats = await db.get_all_chats() + m = await ctx.reply_text( + f"**Banning {user.mention} Globally!**" + + f" **This Action Should Take About {len(served_chats)} Seconds.**" + ) + await add_gban_user(user_id) + number_of_chats = 0 + for served_chat in served_chats: + try: + await app.ban_chat_member(served_chat["chat_id"], user.id) + number_of_chats += 1 + await asyncio.sleep(1) + except FloodWait as e: + await asyncio.sleep(int(e.value)) + except Exception: + pass + try: + await app.send_message( + user.id, + f"Hello, You have been globally banned by {from_user.mention}," + + " You can appeal for this ban by talking to him.", + ) + except Exception: + pass + await m.edit(f"Banned {user.mention} Globally!") + ban_text = f""" +__**New Global Ban**__ +**Origin:** {message.chat.title} [`{message.chat.id}`] +**Admin:** {from_user.mention} +**Banned User:** {user.mention} +**Banned User ID:** `{user_id}` +**Reason:** __{reason}__ +**Chats:** `{number_of_chats}`""" + try: + m2 = await app.send_message( + LOG_CHANNEL, + text=ban_text, + disable_web_page_preview=True, + ) + await m.edit( + f"Banned {user.mention} Globally!\nAction Log: {m2.link}", + disable_web_page_preview=True, + ) + except Exception: + await ctx.reply_text( + "User Gbanned, But This Gban Action Wasn't Logged, Add Me In LOG_CHANNEL" + ) + + +# Ungban +@app.on_message(filters.command("ungban", COMMAND_HANDLER) & filters.user(SUDO)) +async def unban_globally(self: Client, ctx: Message): + user_id = await extract_user(ctx) + if not user_id: + return await message.reply_text("I can't find that user.") + user = await app.get_users(user_id) + + is_gbanned = await is_gbanned_user(user.id) + if not is_gbanned: + await ctx.reply_text("I don't remember Gbanning him.") + else: + await remove_gban_user(user.id) + await ctx.reply_text(f"Lifted {user.mention}'s Global Ban.'") + + @app.on_message(filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO)) @app.on_edited_message(filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO)) @user.on_message(filters.command(["shell", "sh", "term"], ".") & filters.me)