diff --git a/database/feds_db.py b/database/feds_db.py new file mode 100644 index 00000000..7a26aef3 --- /dev/null +++ b/database/feds_db.py @@ -0,0 +1,179 @@ +import pytz +from datetime import datetime +from misskaty.vars import SUDO +from database import dbname + + +fedsdb = dbname["federation"] + + +def get_fed_info(fed_id): + get = fedsdb.find_one({"fed_id": str(fed_id)}) + if get is None: + return False + return get + + +async def get_fed_id(chat_id): + get = await fedsdb.find_one({"chat_ids.chat_id": int(chat_id)}) + + if get is None: + return False + else: + for chat_info in get.get("chat_ids", []): + if chat_info["chat_id"] == int(chat_id): + return get["fed_id"] + + return False + + +async def get_feds_by_owner(owner_id): + cursor = fedsdb.find({"owner_id": owner_id}) + feds = await cursor.to_list(length=None) + if not feds: + return False + federations = [ + {"fed_id": fed["fed_id"], "fed_name": fed["fed_name"]} for fed in feds + ] + return federations + + +async def transfer_owner(fed_id, current_owner_id, new_owner_id): + if await is_user_fed_owner(fed_id, current_owner_id): + await fedsdb.update_one( + {"fed_id": fed_id, "owner_id": current_owner_id}, + {"$set": {"owner_id": new_owner_id}}, + ) + return True + else: + return False + + +async def set_log_chat(fed_id, log_group_id: int): + await fedsdb.update_one( + {"fed_id": fed_id}, {"$set": {"log_group_id": log_group_id}} + ) + return + + +async def get_fed_name(chat_id): + get = await fedsdb.find_one(int(chat_id)) + if get is None: + return False + else: + return get["fed_name"] + + +async def is_user_fed_owner(fed_id, user_id: int): + getfed = await get_fed_info(fed_id) + if not getfed: + return False + owner_id = getfed["owner_id"] + if user_id == owner_id or user_id not in SUDO: + return True + else: + return False + + +async def search_fed_by_id(fed_id): + get = await fedsdb.find_one({"fed_id": str(fed_id)}) + + if get is not None: + return get + else: + return False + + +def chat_join_fed(fed_id, chat_name, chat_id): + return fedsdb.update_one( + {"fed_id": fed_id}, + {"$push": {"chat_ids": {"chat_id": int(chat_id), "chat_name": chat_name}}}, + ) + + +async def chat_leave_fed(chat_id): + result = await fedsdb.update_one( + {"chat_ids.chat_id": int(chat_id)}, + {"$pull": {"chat_ids": {"chat_id": int(chat_id)}}}, + ) + if result.modified_count > 0: + return True + else: + return False + + +async def user_join_fed(fed_id, user_id): + result = await fedsdb.update_one( + {"fed_id": fed_id}, {"$addToSet": {"fadmins": int(user_id)}}, upsert=True + ) + if result.modified_count > 0: + return True + else: + return False + + +async def user_demote_fed(fed_id, user_id): + result = await fedsdb.update_one( + {"fed_id": fed_id}, {"$pull": {"fadmins": int(user_id)}} + ) + if result.modified_count > 0: + return True + else: + return False + + +async def search_user_in_fed(fed_id, user_id): + getfed = await search_fed_by_id(fed_id) + if getfed is None: + return False + fadmins = getfed["fadmins"] + if user_id in fadmins: + return True + else: + return False + + +async def chat_id_and_names_in_fed(fed_id): + getfed = await search_fed_by_id(fed_id) + + if getfed is None or "chat_ids" not in getfed: + return [], [] + + chat_ids = [chat["chat_id"] for chat in getfed["chat_ids"]] + chat_names = [chat["chat_name"] for chat in getfed["chat_ids"]] + return chat_ids, chat_names + + +async def add_fban_user(fed_id, user_id, reason): + current_date = datetime.now(pytz.timezone("Asia/Jakarta")).strftime( + "%Y-%m-%d %H:%M" + ) + await fedsdb.update_one( + {"fed_id": fed_id}, + { + "$push": { + "banned_users": { + "user_id": int(user_id), + "reason": reason, + "date": current_date, + } + } + }, + upsert=True, + ) + + +async def remove_fban_user(fed_id, user_id): + await fedsdb.update_one( + {"fed_id": fed_id}, {"$pull": {"banned_users": {"user_id": int(user_id)}}} + ) + + +async def check_banned_user(fed_id, user_id): + result = await fedsdb.find_one({"fed_id": fed_id, "banned_users.user_id": user_id}) + if result and "banned_users" in result: + for user in result["banned_users"]: + if user.get("user_id") == user_id: + return {"reason": user.get("reason"), "date": user.get("date")} + + return False diff --git a/misskaty/__init__.py b/misskaty/__init__.py index d43acfdd..6792f6fb 100644 --- a/misskaty/__init__.py +++ b/misskaty/__init__.py @@ -42,7 +42,7 @@ MOD_NOLOAD = ["subscene_dl"] HELPABLE = {} cleanmode = {} botStartTime = time.time() -misskaty_version = "v2.12.2 - Stable" +misskaty_version = "v2.13" uvloop.install() faulthandler_enable() diff --git a/misskaty/core/misskaty_patch/bound/message.py b/misskaty/core/misskaty_patch/bound/message.py index abd15221..18d487ab 100644 --- a/misskaty/core/misskaty_patch/bound/message.py +++ b/misskaty/core/misskaty_patch/bound/message.py @@ -7,6 +7,7 @@ from typing import Union from pyrogram.errors import ( ChatAdminRequired, + ChannelPrivate, ChatSendPlainForbidden, ChatWriteForbidden, FloodWait, @@ -91,7 +92,7 @@ async def reply_text( LOGGER.warning(f"Got floodwait in {self.chat.id} for {e.value}'s.") await asleep(e.value) return await reply_text(self, text, *args, **kwargs) - except TopicClosed: + except (TopicClosed, ChannelPrivate): return except (ChatWriteForbidden, ChatAdminRequired, ChatSendPlainForbidden): LOGGER.info( @@ -138,7 +139,7 @@ async def edit_text( LOGGER.warning(f"Got floodwait in {self.chat.id} for {e.value}'s.") await asleep(e.value) return await edit_text(self, text, *args, **kwargs) - except MessageNotModified: + except (MessageNotModified, ChannelPrivate): return False except (ChatWriteForbidden, ChatAdminRequired): LOGGER.info( diff --git a/misskaty/core/misskaty_patch/methods/edit_message_text.py b/misskaty/core/misskaty_patch/methods/edit_message_text.py index 019a8344..b35edfd1 100644 --- a/misskaty/core/misskaty_patch/methods/edit_message_text.py +++ b/misskaty/core/misskaty_patch/methods/edit_message_text.py @@ -2,6 +2,7 @@ import asyncio from typing import Union from pyrogram import Client +from pyrogram.types import Message async def edit_message_text( diff --git a/misskaty/helper/functions.py b/misskaty/helper/functions.py index eed1d9f1..b8923e51 100644 --- a/misskaty/helper/functions.py +++ b/misskaty/helper/functions.py @@ -4,6 +4,7 @@ from re import sub as re_sub from string import ascii_lowercase from pyrogram import enums +from pyrogram.types import Message from misskaty import app @@ -106,7 +107,7 @@ async def extract_user(message): return (await extract_user_and_reason(message))[0] -async def time_converter(message, time_value: str) -> int: +async def time_converter(message: Message, time_value: str) -> datetime: unit = ["m", "h", "d"] # m == minutes | h == hours | d == days check_unit = "".join(list(filter(time_value[-1].lower().endswith, unit))) currunt_time = datetime.now() @@ -121,7 +122,7 @@ async def time_converter(message, time_value: str) -> int: temp_time = currunt_time + timedelta(days=int(time_digit)) else: return await message.reply_text("Incorrect time specified.") - return int(datetime.timestamp(temp_time)) + return temp_time def extract_text_and_keyb(ikb, text: str, row_width: int = 2): diff --git a/misskaty/plugins/admin.py b/misskaty/plugins/admin.py index 345fa9bf..80495539 100644 --- a/misskaty/plugins/admin.py +++ b/misskaty/plugins/admin.py @@ -194,6 +194,8 @@ async def kickFunc(client: Client, ctx: Message, strings) -> "Message": await ctx.chat.unban_member(user_id) except ChatAdminRequired: await ctx.reply_msg(strings("no_ban_permission")) + except Exception as e: + await ctx.reply_msg(str(e)) # Ban/DBan/TBan User @@ -258,6 +260,8 @@ async def banFunc(client, message, strings): await message.reply_msg(msg, reply_markup=keyboard) except ChatAdminRequired: await message.reply("Please give me permission to banned members..!!!") + except Exception as e: + await message.reply_msg(str(e)) # Unban members @@ -288,6 +292,8 @@ async def unban_func(_, message, strings): await message.reply_msg(strings("unknown_id", context="general")) except ChatAdminRequired: await message.reply("Please give me permission to unban members..!!!") + except Exception as e: + await message.reply_msg(str(e)) # Ban users listed in a message @@ -482,6 +488,8 @@ async def demote(client, message, strings): await message.reply_text(f"Demoted! {umention}") except ChatAdminRequired: await message.reply("Please give permission to demote members..") + except Exception as e: + await message.reply_msg(str(e)) # Pin Messages @@ -509,6 +517,8 @@ async def pin(_, message, strings): strings("pin_no_perm"), disable_web_page_preview=True, ) + except Exception as e: + await message.reply_msg(str(e)) # Mute members @@ -557,8 +567,11 @@ async def mute(client, message, strings): return if reason: msg += strings("banned_reason").format(reas=reason) - await message.chat.restrict_member(user_id, permissions=ChatPermissions(all_perms=False)) - await message.reply_text(msg, reply_markup=keyboard) + try: + await message.chat.restrict_member(user_id, permissions=ChatPermissions(all_perms=False)) + await message.reply_text(msg, reply_markup=keyboard) + except Exception as e: + await message.reply_msg(str(e)) # Unmute members @@ -665,8 +678,11 @@ async def unmute_user(client, cq, strings): text = cq.message.text.markdown text = f"~~{text}~~\n\n" text += strings("rmmute_msg").format(mention=from_user.mention) - await cq.message.chat.unban_member(user_id) - await cq.message.edit(text) + try: + await cq.message.chat.unban_member(user_id) + await cq.message.edit(text) + except Exception as e: + await cq.answer(str(e)) @app.on_callback_query(filters.regex("unban_")) @@ -787,10 +803,13 @@ async def set_chat_title(_, ctx: Message): return await ctx.reply_text(f"**Usage:**\n/{ctx.command[0]} NEW NAME") old_title = ctx.chat.title new_title = ctx.text.split(None, 1)[1] - await ctx.chat.set_title(new_title) - await ctx.reply_text( - f"Successfully Changed Group Title From {old_title} To {new_title}" - ) + try: + await ctx.chat.set_title(new_title) + await ctx.reply_text( + f"Successfully Changed Group Title From {old_title} To {new_title}" + ) + except Exception as e: + await ctx.reply_msg(str(e)) @app.on_cmd("set_user_title", self_admin=True, group_only=True) @@ -807,10 +826,13 @@ async def set_user_title(_, ctx: Message): "**Usage:**\n/set_user_title NEW ADMINISTRATOR TITLE" ) title = ctx.text.split(None, 1)[1] - await app.set_administrator_title(chat_id, from_user.id, title) - await ctx.reply_text( - f"Successfully Changed {from_user.mention}'s Admin Title To {title}" - ) + try: + await app.set_administrator_title(chat_id, from_user.id, title) + await ctx.reply_text( + f"Successfully Changed {from_user.mention}'s Admin Title To {title}" + ) + except Exception as e: + await ctx.reply_msg(str(e)) @app.on_cmd("set_chat_photo", self_admin=True, group_only=True) diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py index 057e45c2..348e6a84 100644 --- a/misskaty/plugins/dev.py +++ b/misskaty/plugins/dev.py @@ -326,7 +326,7 @@ async def unban_globally(_, ctx: Message): filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO) ) @app.on_edited_message( - filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO) + filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO) & ~filters.react ) @user.on_message(filters.command(["shell", "sh", "term"], ".") & filters.me) @use_chat_lang() @@ -389,7 +389,7 @@ async def shell_cmd(_, ctx: Message, strings): ) @app.on_edited_message( (filters.command(["ev", "run", "meval"], COMMAND_HANDLER) | filters.regex(r"app.run\(\)$")) - & filters.user(SUDO) + & filters.user(SUDO) & ~filters.react ) @user.on_message(filters.command(["ev", "run", "meval"], ".") & filters.me) @use_chat_lang() @@ -436,7 +436,7 @@ async def cmd_eval(self: Client, ctx: Message, strings) -> Optional[str]: "cloudscraper": cloudscraper, "json": json, "aiohttp": aiohttp, - "p": _print, + "print": _print, "send": send, "stdout": out_buf, "traceback": traceback, diff --git a/misskaty/plugins/federation.py b/misskaty/plugins/federation.py new file mode 100644 index 00000000..53d70b3f --- /dev/null +++ b/misskaty/plugins/federation.py @@ -0,0 +1,970 @@ +""" +MIT License + +Copyright (c) 2023 SI_NN_ER_LS +Copyright (c) 2023 MissKatyPyro + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import uuid +import asyncio +from database.feds_db import * + +from misskaty import app, BOT_ID +from misskaty.vars import SUDO, LOG_GROUP_ID, COMMAND_HANDLER + +from pyrogram import filters +from pyrogram.enums import ChatMemberStatus, ChatType, ParseMode +from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup + +from misskaty.helper.functions import extract_user, extract_user_and_reason +from pyrogram.errors import FloodWait +from misskaty.core.decorator.errors import capture_err + +__MODULE__ = "Federation" +__HELP__ = """ +Everything is fun, until a spammer starts entering your group, and you have to block it. Then you need to start banning more, and more, and it hurts. +But then you have many groups, and you don't want this spammer to be in one of your groups - how can you deal? Do you have to manually block it, in all your groups?\n +**No longer!** With Federation, you can make a ban in one chat overlap with all other chats.\n +You can even designate federation admins, so your trusted admin can ban all the spammers from chats you want to protect.\n\n +""" + +SUPPORT_CHAT = "@YasirArisM" + + +@app.on_message(filters.command("newfed", COMMAND_HANDLER)) +@capture_err +async def new_fed(self, message): + chat = message.chat + user = message.from_user + if message.chat.type != ChatType.PRIVATE: + return await message.reply_text( + "Federations can only be created by privately messaging me." + ) + if len(message.command) < 2: + return await message.reply_text("Please write the name of the federation!") + fednam = message.text.split(None, 1)[1] + if not fednam == "": + fed_id = str(uuid.uuid4()) + fed_name = fednam + x = await fedsdb.update_one( + {"fed_id": str(fed_id)}, + { + "$set": { + "fed_name": str(fed_name), + "owner_id": int(user.id), + "fadmins": [], + "owner_mention": user.mention, + "banned_users": [], + "chat_ids": [], + "log_group_id": LOG_GROUP_ID, + } + }, + upsert=True, + ) + if not x: + return await message.reply_text( + f"Can't federate! Please contact {SUPPORT_CHAT} if the problem persist." + ) + + await message.reply_text( + "**You have succeeded in creating a new federation!**" + "\nName: `{}`" + "\nID: `{}`" + "\n\nUse the command below to join the federation:" + "\n`/joinfed {}`".format(fed_name, fed_id, fed_id), + parse_mode=ParseMode.MARKDOWN, + ) + try: + await app.send_message( + LOG_GROUP_ID, + "New Federation: {}\nID:
{}".format(fed_name, fed_id),
+ parse_mode=ParseMode.HTML,
+ )
+ except:
+ self.log.info("Cannot send a message to EVENT_LOGS")
+ else:
+ await message.reply_text("Please write down the name of the federation")
+
+
+@app.on_message(filters.command("delfed", COMMAND_HANDLER))
+@capture_err
+async def del_fed(client, message):
+ chat = message.chat
+ user = message.from_user
+ if message.chat.type != ChatType.PRIVATE:
+ return await message.reply_text(
+ "Federations can only be deleted by privately messaging me."
+ )
+ args = message.text.split(" ", 1)
+ if len(args) > 1:
+ is_fed_id = args[1].strip()
+ getinfo = await get_fed_info(is_fed_id)
+ if getinfo is False:
+ return await message.reply_text("This federation does not exist.")
+ if getinfo["owner_id"] == user.id or user.id not in SUDO:
+ fed_id = is_fed_id
+ else:
+ return await message.reply_text("Only federation owners can do this!")
+ else:
+ return await message.reply_text("What should I delete?")
+ is_owner = await is_user_fed_owner(fed_id, user.id)
+ if is_owner is False:
+ return await message.reply_text("Only federation owners can do this!")
+ await message.reply_text(
+ "You sure you want to delete your federation? This cannot be reverted, you will lose your entire ban list, and '{}' will be permanently lost.".format(
+ getinfo["fed_name"]
+ ),
+ reply_markup=InlineKeyboardMarkup(
+ [
+ [
+ InlineKeyboardButton(
+ "⚠️ Delete Federation ⚠️",
+ callback_data=f"rmfed_{fed_id}",
+ )
+ ],
+ [InlineKeyboardButton("Cancel", callback_data="rmfed_cancel")],
+ ]
+ ),
+ )
+
+
+@app.on_message(filters.command("fedtransfer", COMMAND_HANDLER))
+@capture_err
+async def fedtransfer(client, message):
+ chat = message.chat
+ user = message.from_user
+ if message.chat.type != ChatType.PRIVATE:
+ return await message.reply_text(
+ "Federations can only be transferred by privately messaging me."
+ )
+ is_feds = await get_feds_by_owner(int(user.id))
+ if not is_feds:
+ return await message.reply_text("**You haven't created any federations.**")
+ if len(message.command) < 2:
+ return await message.reply_text(
+ "**You needed to specify a user or reply to their message!**"
+ )
+ user_id, fed_id = await extract_user_and_reason(message)
+ if not user_id:
+ return await message.reply_text("I can't find that user.")
+ if not fed_id:
+ return await message.reply(
+ "you need to provide a Fed Id.\n\nUsage:\n/fedtransfer @usename Fed_Id."
+ )
+ is_owner = await is_user_fed_owner(fed_id, user.id)
+ if is_owner is False:
+ return await message.reply_text("Only federation owners can do this!")
+ await message.reply_text(
+ "**You sure you want to transfer your federation? This cannot be reverted.**",
+ reply_markup=InlineKeyboardMarkup(
+ [
+ [
+ InlineKeyboardButton(
+ "⚠️ Transfer Federation ⚠️",
+ callback_data=f"trfed_{user_id}|{fed_id}",
+ )
+ ],
+ [InlineKeyboardButton("Cancel", callback_data="trfed_cancel")],
+ ]
+ ),
+ )
+
+
+@app.on_message(filters.command("myfeds", COMMAND_HANDLER))
+@capture_err
+async def myfeds(client, message):
+ user = message.from_user
+ is_feds = await get_feds_by_owner(int(user.id))
+
+ if is_feds:
+ response_text = "\n\n".join(
+ [
+ f"{i + 1}.\n**Fed Name:** {fed['fed_name']}\n**Fed Id:** `{fed['fed_id']}`"
+ for i, fed in enumerate(is_feds)
+ ]
+ )
+ await message.reply_text(
+ f"**Here are the federations you have created:**\n\n{response_text}"
+ )
+ else:
+ await message.reply_text("**You haven't created any federations.**")
+
+
+@app.on_message(filters.command("renamefed", COMMAND_HANDLER))
+@capture_err
+async def rename_fed(client, message):
+ user = message.from_user
+ msg = message
+ args = msg.text.split(None, 2)
+
+ if len(args) < 3:
+ return await msg.reply_text("usage: /renamefed {})".format(info["fed_name"], fed_id)
+
+ await message.reply_text(text, parse_mode=ParseMode.HTML)
+
+
+@app.on_message(filters.command("joinfed", COMMAND_HANDLER))
+@capture_err
+async def join_fed(self, message):
+ chat = message.chat
+ user = message.from_user
+ if message.chat.type == ChatType.PRIVATE:
+ return await message.reply_text(
+ "This command is specific to groups, not our pm!",
+ )
+
+ member = await self.get_chat_member(chat.id, user.id)
+ fed_id = await get_fed_id(int(chat.id))
+
+ if user.id in SUDO:
+ pass
+ else:
+ if member.status == ChatMemberStatus.OWNER:
+ pass
+ else:
+ return await message.reply_text("Only group creators can use this command!")
+ if fed_id:
+ return await message.reply_text("You cannot join two federations from one chat")
+ args = message.text.split(" ", 1)
+ if len(args) > 1:
+ fed_id = args[1].strip()
+ getfed = await search_fed_by_id(fed_id)
+ if getfed is False:
+ return await message.reply_text("Please enter a valid federation ID")
+
+ x = await chat_join_fed(fed_id, chat.title, chat.id)
+ if not x:
+ return await message.reply_text(
+ f"Failed to join federation! Please contact {SUPPORT_CHAT} if this problem persists!"
+ )
+
+ get_fedlog = getfed["log_group_id"]
+ if get_fedlog:
+ await app.send_message(
+ get_fedlog,
+ "Chat **{}** has joined the federation **{}**".format(
+ chat.title, getfed["fed_name"]
+ ),
+ parse_mode=ParseMode.MARKDOWN,
+ )
+
+ await message.reply_text(
+ "This group has joined the federation: {}!".format(getfed["fed_name"])
+ )
+ else:
+ await message.reply_text(
+ "You need to specify which federation you're asking about by giving me a FedID!"
+ )
+
+
+@app.on_message(filters.command("leavefed", COMMAND_HANDLER))
+@capture_err
+async def leave_fed(client, message):
+ chat = message.chat
+ user = message.from_user
+
+ if message.chat.type == ChatType.PRIVATE:
+ return await message.reply_text(
+ "This command is specific to groups, not our pm!",
+ )
+
+ fed_id = await get_fed_id(int(chat.id))
+ fed_info = await get_fed_info(fed_id)
+
+ member = await app.get_chat_member(chat.id, user.id)
+ if member.status == ChatMemberStatus.OWNER or user.id in SUDO:
+ if await chat_leave_fed(int(chat.id)) is True:
+ get_fedlog = fed_info["log_group_id"]
+ if get_fedlog:
+ await app.send_message(
+ get_fedlog,
+ "Chat **{}** has left the federation **{}**".format(
+ chat.title, fed_info["fed_name"]
+ ),
+ parse_mode=ParseMode.MARKDOWN,
+ )
+ await message.reply_text(
+ "This group has left the federation {}!".format(fed_info["fed_name"]),
+ )
+ else:
+ await message.reply_text(
+ "How can you leave a federation that you never joined?!"
+ )
+ else:
+ await message.reply_text("Only group creators can use this command!")
+
+
+@app.on_message(filters.command("fedchats", COMMAND_HANDLER))
+@capture_err
+async def fed_chat(client, message):
+ chat = message.chat
+ user = message.from_user
+ if message.chat.type != ChatType.PRIVATE:
+ return await message.reply_text(
+ "Fedchats can only be checked by privately messaging me."
+ )
+ if len(message.command) < 2:
+ return await message.reply_text(
+ "Please write the Id of the federation!\n\nUsage:\n/fedchats fed_id"
+ )
+ args = message.text.split(" ", 1)
+ if len(args) > 1:
+ fed_id = args[1].strip()
+ info = await get_fed_info(fed_id)
+ if info is False:
+ return await message.reply_text("This federation does not exist.")
+ fed_owner = info["owner_id"]
+ fed_admins = info["fadmins"]
+ all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
+ if user.id in all_admins or user.id in SUDO:
+ pass
+ else:
+ return await message.reply_text(
+ "You need to be a Fed Admin to use this command"
+ )
+
+ chat_ids, chat_names = await chat_id_and_names_in_fed(fed_id)
+ if not chat_ids:
+ return await message.reply_text("There are no chats in this federation!")
+ text = "\n".join(
+ [
+ f"${chat_name} [`{chat_id}`]"
+ for chat_id, chat_name in zip(chat_ids, chat_names)
+ ]
+ )
+ await message.reply_text(
+ f"**Here are the list of chats connected to this federation:**\n\n{text}"
+ )
+
+
+@app.on_message(filters.command("fedinfo", COMMAND_HANDLER))
+@capture_err
+async def fed_info(client, message):
+ if len(message.command) < 2:
+ return await message.reply_text("Please provide the Fed Id to get information!")
+
+ fed_id = message.text.split(" ", 1)[1].strip()
+ fed_info = await get_fed_info(fed_id)
+
+ if not fed_info:
+ return await message.reply_text("Federation not found.")
+
+ fed_name = fed_info.get("fed_name")
+ owner_mention = fed_info.get("owner_mention")
+ fadmin_count = len(fed_info.get("fadmins", []))
+ banned_users_count = len(fed_info.get("banned_users", []))
+ chat_ids_count = len(fed_info.get("chat_ids", []))
+
+ reply_text = (
+ f"**Federation Information:**\n\n"
+ f"**Fed Name:** {fed_name}\n"
+ f"**Owner:** {owner_mention}\n"
+ f"**Number of Fed Admins:** {fadmin_count}\n"
+ f"**Number of Banned Users:** {banned_users_count}\n"
+ f"**Number of Chats:** {chat_ids_count}"
+ )
+
+ await message.reply_text(reply_text)
+
+
+@app.on_message(filters.command("fedadmins", COMMAND_HANDLER))
+@capture_err
+async def get_all_fadmins_mentions(client, message):
+ if len(message.command) < 2:
+ return await message.reply_text("Please provide me the Fed Id to search!")
+
+ fed_id = message.text.split(" ", 1)[1].strip()
+ fed_info = await get_fed_info(fed_id)
+ if not fed_info:
+ return await message.reply_text("Federation not found.")
+
+ fadmin_ids = fed_info.get("fadmins", [])
+ if not fadmin_ids:
+ return await message.reply_text(f"**Owner: {fed_info['owner_mention']}\n\nNo fadmins found in the federation.")
+
+ user_mentions = []
+ for user_id in fadmin_ids:
+ try:
+ user = await app.get_users(int(user_id))
+ user_mentions.append(f"● {user.mention}[`{user.id}`]")
+ except Exception:
+ user_mentions.append(f"● `Admin🥷`[`{user_id}`]")
+ reply_text = f"**Owner: {fed_info['owner_mention']}\n\nList of fadmins:**\n" + "\n".join(user_mentions)
+ await message.reply_text(reply_text)
+
+
+@app.on_message(filters.command("fpromote", COMMAND_HANDLER))
+@capture_err
+async def fpromote(client, message):
+ chat = message.chat
+ user = message.from_user
+ msg = message
+
+ if message.chat.type == ChatType.PRIVATE:
+ return await message.reply_text(
+ "This command is specific to groups, not our pm!",
+ )
+
+ fed_id = await get_fed_id(chat.id)
+ if not fed_id:
+ return await message.reply_text(
+ "You need to add a federation to this chat first!"
+ )
+
+ if await is_user_fed_owner(fed_id, user.id) or user.id in SUDO:
+ user_id = await extract_user(msg)
+
+ if user_id is None:
+ return await message.reply_text("Failed to extract user from the message.")
+ check_user = await check_banned_user(fed_id, user_id)
+ if check_user:
+ user = await app.get_users(user_id)
+ reason = check_user["reason"]
+ date = check_user["date"]
+ return await message.reply_text(
+ f"**User {user.mention} was Fed Banned.\nyou can unban the user and promote.\n\nReason: {reason}.\nDate: {date}.**"
+ )
+
+ getuser = await search_user_in_fed(fed_id, user_id)
+ info = await get_fed_info(fed_id)
+ get_owner = info["owner_id"]
+
+ if user_id == get_owner:
+ return await message.reply_text(
+ "You do know that the user is the federation owner, right? RIGHT?"
+ )
+
+ if getuser:
+ return await message.reply_text(
+ "I cannot promote users who are already federation admins! Can remove them if you want!"
+ )
+
+ if user_id == BOT_ID:
+ return await message.reply_text(
+ "I already am a federation admin in all federations!"
+ )
+ res = await user_join_fed(str(fed_id), user_id)
+ if res:
+ await message.reply_text("Successfully Promoted!")
+ else:
+ await message.reply_text("Failed to promote!")
+ else:
+ await message.reply_text("Only federation owners can do this!")
+
+
+@app.on_message(filters.command("fdemote", COMMAND_HANDLER))
+@capture_err
+async def fdemote(client, message):
+ chat = message.chat
+ user = message.from_user
+ msg = message
+
+ if message.chat.type == ChatType.PRIVATE:
+ return await message.reply_text(
+ "This command is specific to groups, not our pm!",
+ )
+
+ fed_id = await get_fed_id(chat.id)
+ if not fed_id:
+ return await message.reply_text(
+ "You need to add a federation to this chat first!"
+ )
+
+ if await is_user_fed_owner(fed_id, user.id) or user.id in SUDO:
+ user_id = await extract_user(msg)
+
+ if user_id is None:
+ return await message.reply_text("Failed to extract user from the message.")
+
+ if user_id == BOT_ID:
+ return await message.reply_text(
+ "The thing you are trying to demote me from will fail to work without me! Just saying."
+ )
+
+ if await search_user_in_fed(fed_id, user_id) is False:
+ return await message.reply_text(
+ "I cannot demote people who are not federation admins!"
+ )
+
+ res = await user_demote_fed(fed_id, user_id)
+ if res is True:
+ await message.reply_text("Demoted from a Fed Admin!")
+ else:
+ await message.reply_text("Demotion failed!")
+ else:
+ await message.reply_text("Only federation owners can do this!")
+
+
+@app.on_message(filters.command(["fban", "sfban"], COMMAND_HANDLER))
+@capture_err
+async def fban_user(client, message):
+ chat = message.chat
+ from_user = message.from_user
+ if message.chat.type == ChatType.PRIVATE:
+ await message.reply_text("This command is specific to groups, not our pm!.")
+ return
+ fed_id = await get_fed_id(chat.id)
+ if not fed_id:
+ return await message.reply_text("**This chat is not a part of any federation.")
+ info = await get_fed_info(fed_id)
+ fed_owner = info["owner_id"]
+ fed_admins = info["fadmins"]
+ all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
+ if from_user.id in all_admins or from_user.id in SUDO:
+ pass
+ else:
+ return await message.reply_text(
+ "You need to be a Fed Admin to use this command"
+ )
+ if len(message.command) < 2:
+ return await message.reply_text(
+ "**You needed to specify a user or reply to their message!**"
+ )
+ user_id, reason = await extract_user_and_reason(message)
+ user = await app.get_users(user_id)
+ if not user_id:
+ return await message.reply_text("I can't find that user.")
+ if user_id in all_admins or user_id in SUDO:
+ return await message.reply_text("I can't ban that user.")
+ check_user = await check_banned_user(fed_id, user_id)
+ if check_user:
+ reason = check_user["reason"]
+ date = check_user["date"]
+ return await message.reply_text(
+ f"**User {user.mention} was already Fed Banned.\n\nReason: {reason}.\nDate: {date}.**"
+ )
+ if not reason:
+ return await message.reply("No reason provided.")
+
+ served_chats, _ = await chat_id_and_names_in_fed(fed_id)
+ m = await message.reply_text(
+ f"**Fed Banning {user.mention}!**"
+ + f" **This Action Should Take About {len(served_chats)} Seconds.**"
+ )
+ await add_fban_user(fed_id, user_id, reason)
+ number_of_chats = 0
+ for served_chat in served_chats:
+ try:
+ chat_member = await app.get_chat_member(served_chat, user.id)
+ if chat_member.status == ChatMemberStatus.MEMBER:
+ await app.ban_chat_member(served_chat, user.id)
+ if served_chat != chat.id:
+ if not message.text.startswith("/s"):
+ await app.send_message(
+ served_chat, f"**Fed Banned {user.mention} !**"
+ )
+ number_of_chats += 1
+ await asyncio.sleep(1)
+ except FloodWait as e:
+ await asyncio.sleep(int(e.value))
+ except Exception:
+ pass
+ try:
+ await app.send_message(
+ user.id,
+ f"Hello, You have been fed banned by {from_user.mention},"
+ + " You can appeal for this ban by talking to him.",
+ )
+ except Exception:
+ pass
+ await m.edit(f"Fed Banned {user.mention} !")
+ ban_text = f"""
+__**New Federation Ban**__
+**Origin:** {message.chat.title} [`{message.chat.id}`]
+**Admin:** {from_user.mention}
+**Banned User:** {user.mention}
+**Banned User ID:** `{user_id}`
+**Reason:** __{reason}__
+**Chats:** `{number_of_chats}`"""
+ try:
+ m2 = await app.send_message(
+ info["log_group_id"],
+ text=ban_text,
+ disable_web_page_preview=True,
+ )
+ await m.edit(
+ f"Fed Banned {user.mention} !\nAction Log: {m2.link}",
+ disable_web_page_preview=True,
+ )
+ except Exception:
+ await message.reply_text(
+ "User Fbanned, But This Fban Action Wasn't Logged, Add Me In LOG_GROUP"
+ )
+
+
+@app.on_message(filters.command(["unfban", "sunfban"], COMMAND_HANDLER))
+@capture_err
+async def funban_user(client, message):
+ chat = message.chat
+ from_user = message.from_user
+ if message.chat.type == ChatType.PRIVATE:
+ await message.reply_text("This command is specific to groups, not our pm!.")
+ return
+ fed_id = await get_fed_id(chat.id)
+ if not fed_id:
+ return await message.reply_text("**This chat is not a part of any federation.")
+ info = await get_fed_info(fed_id)
+ fed_owner = info["owner_id"]
+ fed_admins = info["fadmins"]
+ all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
+ if from_user.id in all_admins or from_user.id in SUDO:
+ pass
+ else:
+ return await message.reply_text(
+ "You need to be a Fed Admin to use this command"
+ )
+ if len(message.command) < 2:
+ return await message.reply_text(
+ "**You needed to specify a user or reply to their message!**"
+ )
+ user_id, reason = await extract_user_and_reason(message)
+ user = await app.get_users(user_id)
+ if not user_id:
+ return await message.reply_text("I can't find that user.")
+ if user_id in all_admins or user_id in SUDO:
+ return await message.reply_text("**How can an admin ever be banned!.**")
+ check_user = await check_banned_user(fed_id, user_id)
+ if not check_user:
+ return await message.reply_text(
+ "**I can't unban a user who was never fedbanned.**"
+ )
+ if not reason:
+ return await message.reply("No reason provided.")
+
+ served_chats, _ = await chat_id_and_names_in_fed(fed_id)
+ m = await message.reply_text(
+ f"**Fed UnBanning {user.mention}!**"
+ + f" **This Action Should Take About {len(served_chats)} Seconds.**"
+ )
+ await remove_fban_user(fed_id, user_id)
+ number_of_chats = 0
+ for served_chat in served_chats:
+ try:
+ chat_member = await app.get_chat_member(served_chat, user.id)
+ if chat_member.status == ChatMemberStatus.BANNED:
+ await app.unban_chat_member(served_chat, user.id)
+ if served_chat != chat.id:
+ if not message.text.startswith("/s"):
+ await app.send_message(
+ served_chat, f"**Fed UnBanned {user.mention} !**"
+ )
+ number_of_chats += 1
+ await asyncio.sleep(1)
+ except FloodWait as e:
+ await asyncio.sleep(int(e.value))
+ except Exception:
+ pass
+ try:
+ await app.send_message(
+ user.id,
+ f"Hello, You have been fed unbanned by {from_user.mention},"
+ + " You can thank him for his action.",
+ )
+ except Exception:
+ pass
+ await m.edit(f"Fed UnBanned {user.mention} !")
+ ban_text = f"""
+__**New Federation UnBan**__
+**Origin:** {message.chat.title} [`{message.chat.id}`]
+**Admin:** {from_user.mention}
+**UnBanned User:** {user.mention}
+**UnBanned User ID:** `{user_id}`
+**Reason:** __{reason}__
+**Chats:** `{number_of_chats}`"""
+ try:
+ m2 = await app.send_message(
+ info["log_group_id"],
+ text=ban_text,
+ disable_web_page_preview=True,
+ )
+ await m.edit(
+ f"Fed UnBanned {user.mention} !\nAction Log: {m2.link}",
+ disable_web_page_preview=True,
+ )
+ except Exception:
+ await message.reply_text(
+ "User FUnbanned, But This Fban Action Wasn't Logged, Add Me In LOG_GROUP"
+ )
+
+
+@app.on_message(filters.command("fedstat", COMMAND_HANDLER))
+async def fedstat(client, message):
+ user = message.from_user
+ if message.chat.type != ChatType.PRIVATE:
+ await message.reply_text(
+ "Federation Ban status can only be checked by privately messaging me."
+ )
+ return
+ if len(message.command) < 2:
+ await message.reply_text("Please provide me the user name and Fed Id!")
+ return
+ user_id, fed_id = await extract_user_and_reason(message)
+ if not user_id:
+ user_id = message.from_user.id
+ fed_id = message.text.split(" ", 1)[1].strip()
+ if not fed_id:
+ return await message.reply_text(
+ "Provide me a Fed Id along with the command to search for."
+ )
+ info = await get_fed_info(fed_id)
+ if not info:
+ await message.reply_text("Please enter a valid fed id")
+ else:
+ check_user = await check_banned_user(fed_id, user_id)
+ if check_user:
+ user = await app.get_users(user_id)
+ reason = check_user["reason"]
+ date = check_user["date"]
+ return await message.reply_text(
+ f"**User {user.mention} was Fed Banned for:\n\nReason: {reason}.\nDate: {date}.**"
+ )
+ else:
+ await message.reply_text(
+ f"**User {user.mention} is not Fed Banned in this federation.**"
+ )
+
+
+@app.on_message(filters.command("fbroadcast", COMMAND_HANDLER))
+@capture_err
+async def fbroadcast_message(client, message):
+ chat = message.chat
+ from_user = message.from_user
+ reply_message = message.reply_to_message
+ if message.chat.type == ChatType.PRIVATE:
+ await message.reply_text("This command is specific to groups, not our pm!.")
+ return
+ fed_id = await get_fed_id(chat.id)
+ if not fed_id:
+ return await message.reply_text("**This chat is not a part of any federation.")
+ info = await get_fed_info(fed_id)
+ fed_owner = info["owner_id"]
+ fed_admins = info["fadmins"]
+ all_admins = [fed_owner] + fed_admins + [int(BOT_ID)]
+ if from_user.id in all_admins or from_user.id in SUDO:
+ pass
+ else:
+ return await message.reply_text(
+ "You need to be a Fed Admin to use this command"
+ )
+ if not reply_message:
+ return await message.reply_text(
+ "**You need to reply to a text message to Broadcasted it.**"
+ )
+ sleep_time = 0.1
+
+ if reply_message.text:
+ text = reply_message.text.markdown
+ else:
+ return await message.reply_text("You can only Broadcast text messages.")
+
+ reply_markup = None
+ if reply_message.reply_markup:
+ reply_markup = InlineKeyboardMarkup(reply_message.reply_markup.inline_keyboard)
+ sent = 0
+ chats, _ = await chat_id_and_names_in_fed(fed_id)
+ m = await message.reply_text(
+ f"Broadcast in progress, will take {len(chats) * sleep_time} seconds."
+ )
+ for i in chats:
+ try:
+ await app.send_message(
+ i,
+ text=text,
+ reply_markup=reply_markup,
+ )
+ sent += 1
+ await asyncio.sleep(sleep_time)
+ except FloodWait as e:
+ await asyncio.sleep(int(e.value))
+ except Exception:
+ pass
+ await m.edit(f"**Broadcasted Message In {sent} Chats.**")
+
+
+@app.on_callback_query(filters.regex("rmfed_(.*)"))
+async def del_fed_button(client, cb):
+ query = cb.data
+ userid = cb.message.chat.id
+ fed_id = query.split("_")[1]
+
+ if fed_id == "cancel":
+ await cb.message.edit_text("Federation deletion cancelled")
+ return
+
+ getfed = await get_fed_info(fed_id)
+ if getfed:
+ delete = fedsdb.delete_one({"fed_id": str(fed_id)})
+ if delete:
+ await cb.message.edit_text(
+ "You have removed your Federation! Now all the Groups that are connected with `{}` do not have a Federation.".format(
+ getfed["fed_name"]
+ ),
+ parse_mode=ParseMode.MARKDOWN,
+ )
+
+
+@app.on_callback_query(filters.regex("trfed_(.*)"))
+async def fedtransfer_button(client, cb):
+ query = cb.data
+ userid = cb.message.chat.id
+ data = query.split("_")[1]
+
+ if data == "cancel":
+ await cb.message.edit_text("Federation deletion cancelled")
+ return
+ data2 = data.split("|", 1)
+ new_owner_id = int(data2[0])
+ fed_id = data2[1]
+ transferred = await transfer_owner(fed_id, userid, new_owner_id)
+ if transferred:
+ await cb.message.edit_text(
+ "**Successfully transferred ownership to new owner.**"
+ )
+
+
+@app.on_callback_query(filters.regex("fed_(.*)"))
+async def fed_owner_help(client, cb):
+ query = cb.data
+ userid = cb.message.chat.id
+ data = query.split("_")[1]
+ if data == "owner":
+ text = """**👑 Fed Owner Only:**
+ • /newfed {from_user.id}\n"
message_out_str += f"✴️ User Name: {username}\n"
@@ -520,8 +490,8 @@ async def who_is(client, message):
if bio:
message_out_str += f"👨🏿💻 Bio: {bio}\n"
message_out_str += f"📸 Pictures: {count_pic}\n"
- message_out_str += f"🧐 Restricted: {message.from_user.is_restricted}\n"
- message_out_str += f"✅ Verified: {message.from_user.is_verified}\n"
+ message_out_str += f"🧐 Restricted: {from_user.is_restricted}\n"
+ message_out_str += f"✅ Verified: {from_user.is_verified}\n"
message_out_str += f"🌐 Profile Link: Click Here\n"
if message.chat.type.value in (("supergroup", "channel")):
try:
diff --git a/misskaty/plugins/start_help.py b/misskaty/plugins/start_help.py
index fb93cdaa..09f82cd2 100644
--- a/misskaty/plugins/start_help.py
+++ b/misskaty/plugins/start_help.py
@@ -20,6 +20,7 @@ from misskaty.helper import bot_sys_stats, paginate_modules
from misskaty.helper.localization import use_chat_lang
from misskaty.vars import COMMAND_HANDLER
+
home_keyboard_pm = InlineKeyboardMarkup(
[
[
@@ -66,6 +67,21 @@ keyboard = InlineKeyboardMarkup(
]
)
+FED_MARKUP = InlineKeyboardMarkup(
+ [
+ [
+ InlineKeyboardButton("Fed Owner Commands", callback_data="fed_owner"),
+ InlineKeyboardButton("Fed Admin Commands", callback_data="fed_admin"),
+ ],
+ [
+ InlineKeyboardButton("User Commands", callback_data="fed_user"),
+ ],
+ [
+ InlineKeyboardButton("Back", callback_data="help_back"),
+ ],
+ ]
+)
+
@app.on_message(filters.command("start", COMMAND_HANDLER))
@use_chat_lang()
@@ -89,6 +105,18 @@ async def start(_, ctx: Message, strings):
+ HELPABLE[module].__HELP__
)
await ctx.reply_msg(text, disable_web_page_preview=True)
+ if module == "federation":
+ return await ctx.reply(
+ text=text,
+ reply_markup=FED_MARKUP,
+ disable_web_page_preview=True,
+ )
+ await ctx.reply(
+ text,
+ reply_markup=InlineKeyboardMarkup(
+ [[InlineKeyboardButton("back", callback_data="help_back")]]
+ ),
+ disable_web_page_preview=True)
elif name == "help":
text, keyb = await help_parser(ctx.from_user.first_name)
await ctx.reply_msg(
@@ -202,7 +230,12 @@ async def help_button(self: Client, query: CallbackQuery, strings):
strings("help_name").format(mod=HELPABLE[module].__MODULE__)
+ HELPABLE[module].__HELP__
)
-
+ if module == "federation":
+ return await query.message.edit(
+ text=text,
+ reply_markup=FED_MARKUP,
+ disable_web_page_preview=True,
+ )
await query.message.edit_msg(
text=text,
reply_markup=InlineKeyboardMarkup(
diff --git a/misskaty/plugins/stickers.py b/misskaty/plugins/stickers.py
index 147949f6..3d901464 100644
--- a/misskaty/plugins/stickers.py
+++ b/misskaty/plugins/stickers.py
@@ -58,7 +58,7 @@ EMOJI_PATTERN = get_emoji_regex()
SUPPORTED_TYPES = ["jpeg", "png", "webp"]
-@app.on_cmd("getsticker")
+@app.on_message(filters.command(["getsticker", "toimage"], COMMAND_HANDLER))
@use_chat_lang()
async def getsticker_(self: Client, ctx: Message, strings):
if not ctx.reply_to_message:
diff --git a/misskaty/vars.py b/misskaty/vars.py
index 3186addf..37873fb3 100644
--- a/misskaty/vars.py
+++ b/misskaty/vars.py
@@ -39,6 +39,7 @@ else:
LOG_CHANNEL = int(LOG_CHANNEL)
# Optional ENV
+LOG_GROUP_ID = environ.get("LOG_GROUP_ID")
USER_SESSION = environ.get("USER_SESSION")
DATABASE_NAME = environ.get("DATABASE_NAME", "MissKatyDB")
TZ = environ.get("TZ", "Asia/Jakarta")