mirror of
https://github.com/yasirarism/MissKatyPyro.git
synced 2025-12-29 17:44:50 +00:00
Try add gban plugins
This commit is contained in:
parent
a85dcf88ac
commit
600b8f1136
3 changed files with 112 additions and 5 deletions
21
database/gban_db.py
Normal file
21
database/gban_db.py
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
from database import dbname
|
||||
|
||||
gbansdb = dbname.gban
|
||||
|
||||
async def is_gbanned_user(user_id: int) -> bool:
|
||||
user = await gbansdb.find_one({"user_id": user_id})
|
||||
if not user:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def add_gban_user(user_id: int):
|
||||
is_gbanned = await is_gbanned_user(user_id)
|
||||
if is_gbanned:
|
||||
return
|
||||
return await gbansdb.insert_one({"user_id": user_id})
|
||||
|
||||
async def remove_gban_user(user_id: int):
|
||||
is_gbanned = await is_gbanned_user(user_id)
|
||||
if not is_gbanned:
|
||||
return
|
||||
return await gbansdb.delete_one({"user_id": user_id})
|
||||
|
|
@ -19,17 +19,17 @@ def get_readable_time(seconds: int) -> str:
|
|||
(days, remainder) = divmod(seconds, 86400)
|
||||
days = int(days)
|
||||
if days != 0:
|
||||
result += f"{days} day "
|
||||
result += f"{days} d "
|
||||
(hours, remainder) = divmod(remainder, 3600)
|
||||
hours = int(hours)
|
||||
if hours != 0:
|
||||
result += f"{hours} hour "
|
||||
result += f"{hours} h "
|
||||
(minutes, seconds) = divmod(remainder, 60)
|
||||
minutes = int(minutes)
|
||||
if minutes != 0:
|
||||
result += f"{minutes} minute "
|
||||
result += f"{minutes} m "
|
||||
seconds = int(seconds)
|
||||
result += f"{seconds} second "
|
||||
result += f"{seconds} s "
|
||||
return result
|
||||
|
||||
|
||||
|
|
@ -46,7 +46,7 @@ def get_readable_time2(seconds: int) -> str:
|
|||
count = 0
|
||||
ping_time = ""
|
||||
time_list = []
|
||||
time_suffix_list = ["second", "minute", "hour", "day", "week", "month", "year"]
|
||||
time_suffix_list = ["s", "m", "h", "d", "w", "m", "y"]
|
||||
while count < 4:
|
||||
count += 1
|
||||
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from time import time
|
|||
from database.users_chats_db import db
|
||||
from inspect import getfullargspec
|
||||
from typing import Any, Optional, Tuple
|
||||
from database.gban_db import is_gbanned_user, add_gban_user, remove_gban_user
|
||||
|
||||
from psutil import cpu_percent, Process
|
||||
from psutil import disk_usage as disk_usage_percent
|
||||
|
|
@ -31,6 +32,10 @@ from misskaty import app, user, botStartTime, misskaty_version, BOT_NAME
|
|||
from misskaty.helper.http import http
|
||||
from misskaty.helper.eval_helper import meval, format_exception
|
||||
from misskaty.helper.localization import use_chat_lang
|
||||
from misskaty.helper.functions import (
|
||||
extract_user,
|
||||
extract_user_and_reason
|
||||
)
|
||||
from misskaty.helper.human_read import get_readable_file_size, get_readable_time
|
||||
from misskaty.vars import COMMAND_HANDLER, SUDO, LOG_CHANNEL
|
||||
|
||||
|
|
@ -45,6 +50,8 @@ __HELP__ = """
|
|||
/enablechat [chat id] - Add Blacklist group
|
||||
/banuser [chat id] - Ban user and block user so cannot use bot
|
||||
/unbanuser [chat id] - Unban user and make their can use bot again
|
||||
/gban - To Ban A User Globally.
|
||||
/ungban - To remove ban user globbaly.
|
||||
|
||||
**For Public Use**
|
||||
/stats - Check statistic bot
|
||||
|
|
@ -200,6 +207,85 @@ async def server_stats(self: Client, ctx: Message) -> "Message":
|
|||
os.remove("stats.png")
|
||||
|
||||
|
||||
# Gban
|
||||
@app.on_message(filters.command("gban", COMMAND_HANDLER) & filters.user(SUDO))
|
||||
async def ban_globally(self: Client, ctx: Message):
|
||||
user_id, reason = await extract_user_and_reason(ctx)
|
||||
user = await app.get_users(user_id)
|
||||
from_user = ctx.from_user
|
||||
|
||||
if not user_id:
|
||||
return await ctx.reply_text("I can't find that user.")
|
||||
if not reason:
|
||||
return await ctx.reply("No reason provided.")
|
||||
|
||||
if user_id in [from_user.id, self.me.id] or user_id in SUDO:
|
||||
return await ctx.reply_text("I can't ban that user.")
|
||||
served_chats = await db.get_all_chats()
|
||||
m = await ctx.reply_text(
|
||||
f"**Banning {user.mention} Globally!**"
|
||||
+ f" **This Action Should Take About {len(served_chats)} Seconds.**"
|
||||
)
|
||||
await add_gban_user(user_id)
|
||||
number_of_chats = 0
|
||||
for served_chat in served_chats:
|
||||
try:
|
||||
await app.ban_chat_member(served_chat["chat_id"], user.id)
|
||||
number_of_chats += 1
|
||||
await asyncio.sleep(1)
|
||||
except FloodWait as e:
|
||||
await asyncio.sleep(int(e.value))
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await app.send_message(
|
||||
user.id,
|
||||
f"Hello, You have been globally banned by {from_user.mention},"
|
||||
+ " You can appeal for this ban by talking to him.",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
await m.edit(f"Banned {user.mention} Globally!")
|
||||
ban_text = f"""
|
||||
__**New Global Ban**__
|
||||
**Origin:** {message.chat.title} [`{message.chat.id}`]
|
||||
**Admin:** {from_user.mention}
|
||||
**Banned User:** {user.mention}
|
||||
**Banned User ID:** `{user_id}`
|
||||
**Reason:** __{reason}__
|
||||
**Chats:** `{number_of_chats}`"""
|
||||
try:
|
||||
m2 = await app.send_message(
|
||||
LOG_CHANNEL,
|
||||
text=ban_text,
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
await m.edit(
|
||||
f"Banned {user.mention} Globally!\nAction Log: {m2.link}",
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
except Exception:
|
||||
await ctx.reply_text(
|
||||
"User Gbanned, But This Gban Action Wasn't Logged, Add Me In LOG_CHANNEL"
|
||||
)
|
||||
|
||||
|
||||
# Ungban
|
||||
@app.on_message(filters.command("ungban", COMMAND_HANDLER) & filters.user(SUDO))
|
||||
async def unban_globally(self: Client, ctx: Message):
|
||||
user_id = await extract_user(ctx)
|
||||
if not user_id:
|
||||
return await message.reply_text("I can't find that user.")
|
||||
user = await app.get_users(user_id)
|
||||
|
||||
is_gbanned = await is_gbanned_user(user.id)
|
||||
if not is_gbanned:
|
||||
await ctx.reply_text("I don't remember Gbanning him.")
|
||||
else:
|
||||
await remove_gban_user(user.id)
|
||||
await ctx.reply_text(f"Lifted {user.mention}'s Global Ban.'")
|
||||
|
||||
|
||||
@app.on_message(filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO))
|
||||
@app.on_edited_message(filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO))
|
||||
@user.on_message(filters.command(["shell", "sh", "term"], ".") & filters.me)
|
||||
|
|
|
|||
Loading…
Reference in a new issue