import asyncio import contextlib import html import io import json import os import pickle import platform import re import sys import traceback from datetime import datetime from inspect import getfullargspec from logging import getLogger from shutil import disk_usage from time import time from typing import Any, Optional, Tuple import aiohttp import cloudscraper import requests from apscheduler.schedulers.asyncio import AsyncIOScheduler from bs4 import BeautifulSoup from PIL import Image, ImageDraw, ImageFont from psutil import Process, boot_time, cpu_count, cpu_percent from psutil import disk_usage as disk_usage_percent from psutil import net_io_counters, virtual_memory from pyrogram import Client from pyrogram import __version__ as pyrover from pyrogram import enums, filters from pyrogram.errors import ( ChatSendPhotosForbidden, ChatSendPlainForbidden, FloodWait, MessageTooLong, PeerIdInvalid, ) from pyrogram.raw.types import UpdateBotStopped from pyrogram.types import ( InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto, LabeledPrice, Message, PreCheckoutQuery, ) from database.gban_db import add_gban_user, is_gbanned_user, remove_gban_user from database.users_chats_db import db from misskaty import BOT_NAME, app, botStartTime, misskaty_version, user from misskaty.core.decorator import new_task from misskaty.helper.eval_helper import format_exception, meval from misskaty.helper.functions import extract_user, extract_user_and_reason from misskaty.helper.http import fetch from misskaty.helper.human_read import get_readable_file_size, get_readable_time from misskaty.helper.localization import use_chat_lang from misskaty.vars import AUTO_RESTART, COMMAND_HANDLER, LOG_CHANNEL, SUDO __MODULE__ = "DevCommand" __HELP__ = """ **For Owner Bot Only.** /run [args] - Run eval CMD /logs [int] - Check logs bot /shell [args] - Run Exec/Terminal CMD /download [link/reply_to_telegram_file] - Download file from Telegram /disablechat [chat id] - Remove blacklist group /enablechat [chat id] - Add Blacklist group /banuser [chat id] - Ban user and block user so cannot use bot /unbanuser [chat id] - Unban user and make their can use bot again /gban - To Ban A User Globally. /ungban - To remove ban user globbaly. /restart - update and restart bot. **For Public Use** /stats - Check statistic bot /json - Send structure message Telegram in JSON using Pyrogram Style. """ var = {} teskode = {} LOGGER = getLogger("MissKaty") async def edit_or_reply(self, msg, **kwargs): func = msg.edit if not self.me.is_bot else msg.reply spec = getfullargspec(func.__wrapped__).args await func(**{k: v for k, v in kwargs.items() if k in spec}) @app.on_message(filters.command(["stars"], COMMAND_HANDLER)) async def star_donation(self: Client, ctx: Message): amount = ctx.command[1] if len(ctx.command) == 2 and ctx.command[1].isdigit() else 5 await self.send_invoice( ctx.chat.id, title="MissKaty Donate", description="You can give me donation via star", currency="XTR", prices=[LabeledPrice(label="Donation", amount=amount)], message_thread_id=ctx.message_thread_id, payload="stars", ) @app.on_pre_checkout_query() async def pre_checkout_query_handler(_: Client, query: PreCheckoutQuery): await query.answer(success=True) @app.on_message(filters.private, group=3) async def successful_payment_handler(_: Client, message: Message): if message.successful_payment: await message.reply( f"Thanks for support for {message.successful_payment.total_amount} {message.successful_payment.currency}! Your transaction ID is : {message.successful_payment.telegram_payment_charge_id}" ) @app.on_message(filters.command(["refund_star"], COMMAND_HANDLER)) async def refund_star_payment(client: Client, message: Message): if len(message.command) == 1: return await message.reply_msg( "Please input telegram_payment_charge_id after command." ) trx_id = message.command[1] try: await client.refund_star_payment(message.from_user.id, trx_id) await message.reply_msg( f"Great {message.from_user.mention}, your stars has been refunded to your balance." ) except Exception as e: await message.reply_msg(e) @app.on_message(filters.command(["logs"], COMMAND_HANDLER) & filters.user(SUDO)) @use_chat_lang() async def log_file(_, ctx: Message, strings): """Send log file""" msg = await ctx.reply_msg("Reading bot logs ...", quote=True) if len(ctx.command) == 1: try: with open("MissKatyLogs.txt", "r") as file: content = file.read() data = { "value": content, } pastelog = await fetch.post( "https://paste.yasirapi.eu.org/save", data=data, follow_redirects=True ) await msg.edit_msg( f"Here the Logs\nlog size: {get_readable_file_size(os.path.getsize('MissKatyLogs.txt'))}" ) except Exception: await ctx.reply_document( "MissKatyLogs.txt", caption="Log Bot MissKatyPyro", reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( strings("cl_btn"), f"close#{ctx.from_user.id}", ) ] ] ), ) await msg.delete_msg() elif len(ctx.command) == 2: val = ctx.text.split() tail = await shell_exec(f"tail -n {val[1]} -v MissKatyLogs.txt") try: await msg.edit_msg(f"
{html.escape(tail[0])}
") except MessageTooLong: with io.BytesIO(str.encode(tail[0])) as s: s.name = "MissKatyLog-Tail.txt" await ctx.reply_document(s) await msg.delete() else: await msg.edit_msg("Unsupported parameter") @app.on_message(filters.command(["donate"], COMMAND_HANDLER)) async def donate(self: Client, ctx: Message): try: await self.send_photo( ctx.chat.id, "https://img.yasirweb.eu.org/file/ee74ce527fb8264b54691.jpg", caption="Hi, If you find this bot useful, you can make a donation to the account below. Because this bot server uses VPS and is not free. Thank You..\n\nIndonesian Payment:\nQRIS: https://img.yasirweb.eu.org/file/ee74ce527fb8264b54691.jpg (Yasir Store)\nBank Jago: 109641845083 (Yasir Aris M)\n\nFor international people can use PayPal to support me or via GitHub Sponsor:\nhttps://paypal.me/yasirarism\nhttps://github.com/sponsors/yasirarism\n\nSource: @BeriKopi", reply_to_message_id=ctx.id, message_effect_id=5159385139981059251 if ctx.chat.type.value == "private" else None, ) except (ChatSendPlainForbidden, ChatSendPhotosForbidden): await self.send_message( LOG_CHANNEL, f"❗️ WARNING\nI'm leaving from {ctx.chat.id} since i didn't have sufficient admin permissions.", ) await ctx.chat.leave() @app.on_message( filters.command(["balas"], COMMAND_HANDLER) & filters.user(SUDO) & filters.reply ) async def balas(_, ctx: Message) -> "str": pesan = ctx.input await ctx.delete_msg() await ctx.reply_msg(pesan, reply_to_message_id=ctx.reply_to_message.id) @app.on_message(filters.command(["stats"], COMMAND_HANDLER)) @new_task async def server_stats(_, ctx: Message) -> "Message": """ Give system stats of the server. """ total, used, free = disk_usage(".") process = Process(os.getpid()) botuptime = get_readable_time(time() - botStartTime) osuptime = get_readable_time(time() - boot_time()) currentTime = get_readable_time(time() - botStartTime) botusage = f"{round(process.memory_info()[0]/1024 ** 2)} MB" upload = get_readable_file_size(net_io_counters().bytes_sent) download = get_readable_file_size(net_io_counters().bytes_recv) cpu_percentage = cpu_percent() cpu_counts = cpu_count() ram_percentage = virtual_memory().percent ram_total = get_readable_file_size(virtual_memory().total) ram_used = get_readable_file_size(virtual_memory().used) disk_percenatge = disk_usage_percent("/").percent disk_total = get_readable_file_size(total) disk_used = get_readable_file_size(used) disk_free = get_readable_file_size(free) neofetch = (await shell_exec("neofetch --stdout"))[0] caption = f"{BOT_NAME} {misskaty_version} is Up and Running successfully.\n\n{neofetch}\n\n**OS Uptime:** {osuptime}\nBot Uptime: {currentTime}\n**Bot Usage:** {botusage}\n\n**Total Space:** {disk_total}\n**Free Space:** {disk_free}\n\n**Download:** {download}\n**Upload:** {upload}\n\nPyroFork Version: {pyrover}\nPython Version: {sys.version_info[0]}.{sys.version_info[1]}.{sys.version_info[2]} {sys.version_info[3].title()}" if "oracle" in platform.uname().release: return await ctx.reply_msg(caption, quote=True) start = datetime.now() msg = await ctx.reply_photo( photo="https://te.legra.ph/file/30a82c22854971d0232c7.jpg", caption=caption, quote=True, ) end = datetime.now() image = Image.open("assets/statsbg.jpg").convert("RGB") IronFont = ImageFont.truetype("assets/IronFont.otf", 42) draw = ImageDraw.Draw(image) def draw_progressbar(coordinate, progress): progress = 110 + (progress * 10.8) draw.ellipse((105, coordinate - 25, 127, coordinate), fill="#FFFFFF") draw.rectangle((120, coordinate - 25, progress, coordinate), fill="#FFFFFF") draw.ellipse( (progress - 7, coordinate - 25, progress + 15, coordinate), fill="#FFFFFF" ) draw_progressbar(243, int(cpu_percentage)) draw.text( (225, 153), f"( {cpu_counts} core, {cpu_percentage}% )", (255, 255, 255), font=IronFont, ) draw_progressbar(395, int(disk_percenatge)) draw.text( (335, 302), f"( {disk_used} / {disk_total}, {disk_percenatge}% )", (255, 255, 255), font=IronFont, ) draw_progressbar(533, int(ram_percentage)) draw.text( (225, 445), f"( {ram_used} / {ram_total} , {ram_percentage}% )", (255, 255, 255), font=IronFont, ) draw.text((335, 600), f"{botuptime}", (255, 255, 255), font=IronFont) draw.text( (857, 607), f"{(end-start).microseconds/1000} ms", (255, 255, 255), font=IronFont, ) image.save("stats.png") await msg.edit_media(media=InputMediaPhoto("stats.png", caption=caption)) os.remove("stats.png") # Gban @app.on_message(filters.command("gban", COMMAND_HANDLER) & filters.user(SUDO)) async def ban_globally(self: Client, ctx: Message): user_id, reason = await extract_user_and_reason(ctx) if not user_id: return await ctx.reply_text("I can't find that user.") if not reason: return await ctx.reply("No reason provided.") try: getuser = await app.get_users(user_id) user_mention = getuser.mention user_id = user.id except PeerIdInvalid: user_mention = int(user_id) user_id = int(user_id) from_user = ctx.from_user if user_id in [from_user.id, self.me.id] or user_id in SUDO: return await ctx.reply_text("I can't ban that user.") served_chats = await db.get_all_chats() m = await ctx.reply_text( f"**Banning {user_mention} Globally! This may take several times.**" ) await add_gban_user(user_id) number_of_chats = 0 async for served_chat in served_chats: try: await app.ban_chat_member(served_chat["id"], user_id) number_of_chats += 1 await asyncio.sleep(1) except FloodWait as e: await asyncio.sleep(int(e.value)) except Exception: pass with contextlib.suppress(Exception): await app.send_message( user_id, f"Hello, You have been globally banned by {from_user.mention}, You can appeal for this ban by talking to him.", ) await m.edit(f"Banned {user_mention} Globally!") ban_text = f""" __**New Global Ban**__ **Origin:** {ctx.chat.title} [`{ctx.chat.id}`] **Admin:** {from_user.mention} **Banned User:** {user_mention} **Banned User ID:** `{user_id}` **Reason:** __{reason}__ **Chats:** `{number_of_chats}`""" try: m2 = await app.send_message( LOG_CHANNEL, text=ban_text, disable_web_page_preview=True, ) await m.edit( f"Banned {user_mention} Globally!\nAction Log: {m2.link}", disable_web_page_preview=True, ) except Exception: await ctx.reply_text( "User Gbanned, But This Gban Action Wasn't Logged, Add Me In LOG_CHANNEL" ) # Ungban @app.on_message(filters.command("ungban", COMMAND_HANDLER) & filters.user(SUDO)) async def unban_globally(_, ctx: Message): user_id = await extract_user(ctx) if not user_id: return await ctx.reply_text("I can't find that user.") try: getuser = await app.get_users(user_id) user_mention = getuser.mention user_id = user.id except PeerIdInvalid: user_mention = int(user_id) user_id = int(user_id) is_gbanned = await is_gbanned_user(user_id) if not is_gbanned: await ctx.reply_text("I don't remember Gbanning him.") else: await remove_gban_user(user_id) await ctx.reply_text(f"Lifted {user_mention}'s Global Ban.'") @app.on_message( filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO) ) @app.on_edited_message( filters.command(["shell", "sh", "term"], COMMAND_HANDLER) & filters.user(SUDO) & ~filters.react ) @user.on_message(filters.command(["shell", "sh", "term"], ".") & filters.me) @use_chat_lang() async def shell_cmd(self: Client, ctx: Message, strings): if len(ctx.command) == 1: return await edit_or_reply(self, ctx, text=strings("no_cmd")) msg = ( await ctx.edit_msg(strings("run_exec")) if not self.me.is_bot else await ctx.reply_msg(strings("run_exec")) ) shell = (await shell_exec(ctx.input))[0] if len(shell) > 3000: with io.BytesIO(str.encode(shell)) as doc: doc.name = "shell_output.txt" await ctx.reply_document( document=doc, caption=f"{ctx.input[: 4096 // 4 - 1]}", file_name=doc.name, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( text=strings("cl_btn"), callback_data=f"close#{ctx.from_user.id if ctx.from_user else self.me.id}", ) ] ] ), ) await msg.delete_msg() elif len(shell) != 0: await edit_or_reply( self, ctx, text=html.escape(shell), parse_mode=enums.ParseMode.HTML, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( text=strings("cl_btn"), callback_data=f"close#{ctx.from_user.id if ctx.from_user else self.me.id}", ) ] ] ), ) if self.me.is_bot: await msg.delete_msg() else: await ctx.reply_msg(strings("no_reply"), del_in=5) @app.on_message( ( filters.command(["ev", "run", "myeval"], COMMAND_HANDLER) | filters.regex(r"app.run\(\)$") ) & filters.user(SUDO) ) @app.on_edited_message( ( filters.command(["ev", "run", "meval"], COMMAND_HANDLER) | filters.regex(r"app.run\(\)$") ) & filters.user(SUDO) & ~filters.react ) @user.on_message(filters.command(["ev", "run", "meval"], ".") & filters.me) @use_chat_lang() async def cmd_eval(self: Client, ctx: Message, strings) -> Optional[str]: if (ctx.command and len(ctx.command) == 1) or ctx.text == "app.run()": return await edit_or_reply(self, ctx, text=strings("no_eval")) status_message = ( await ctx.edit_msg(strings("run_eval")) if not self.me.is_bot else await ctx.reply_msg(strings("run_eval"), quote=True) ) code = ( ctx.text.split(maxsplit=1)[1] if ctx.command else ctx.text.split("\napp.run()")[0] ) out_buf = io.StringIO() out = "" humantime = get_readable_time async def _eval() -> Tuple[str, Optional[str]]: # Message sending helper for convenience async def send(*args: Any, **kwargs: Any) -> Message: return await ctx.reply_msg(*args, **kwargs) # Print wrapper to capture output # We don't override sys.stdout to avoid interfering with other output def _print(*args: Any, **kwargs: Any) -> None: if "file" not in kwargs: kwargs["file"] = out_buf return print(*args, **kwargs) def _help(*args: Any, **kwargs: Any) -> None: with contextlib.redirect_stdout(out_buf): help(*args, **kwargs) eval_vars = { "self": self, "humantime": humantime, "ctx": ctx, "var": var, "teskode": teskode, "re": re, "os": os, "asyncio": asyncio, "cloudscraper": cloudscraper, "json": json, "aiohttp": aiohttp, "print": _print, "send": send, "stdout": out_buf, "traceback": traceback, "fetch": fetch, "r": ctx.reply_to_message, "requests": requests, "soup": BeautifulSoup, "h": _help, } eval_vars.update(var) eval_vars.update(teskode) try: return "", await meval(code, globals(), **eval_vars) except Exception as e: # skipcq: PYL-W0703 # Find first traceback frame involving the snippet first_snip_idx = -1 tb = traceback.extract_tb(e.__traceback__) for i, frame in enumerate(tb): if frame.filename == "" or frame.filename.endswith("ast.py"): first_snip_idx = i break # Re-raise exception if it wasn't caused by the snippet # Return formatted stripped traceback stripped_tb = tb[first_snip_idx:] formatted_tb = format_exception(e, tb=stripped_tb) return "⚠️ Error while executing snippet\n\n", formatted_tb before = time() prefix, result = await _eval() after = time() # Always write result if no output has been collected thus far if not out_buf.getvalue() or result is not None: print(result, file=out_buf) el_us = after - before try: el_str = get_readable_time(el_us) except: el_str = "1s" if not el_str or el_str is None: el_str = "0.1s" out = out_buf.getvalue() # Strip only ONE final newline to compensate for our message formatting if out.endswith("\n"): out = out[:-1] final_output = f"{prefix}INPUT:\n
{html.escape(code)}
\nOUTPUT:\n
{html.escape(out)}
\nExecuted Time: {el_str}" if len(final_output) > 4096: with io.BytesIO(str.encode(out)) as out_file: out_file.name = "MissKatyEval.txt" await ctx.reply_document( document=out_file, caption=f"{code[: 4096 // 4 - 1]}", disable_notification=True, thumb="assets/thumb.jpg", reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( text=strings("cl_btn"), callback_data=f"close#{ctx.from_user.id if ctx.from_user else self.me.id}", ) ] ] ), ) await status_message.delete_msg() else: await edit_or_reply( self, ctx, text=final_output, parse_mode=enums.ParseMode.HTML, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( text=strings("cl_btn"), callback_data=f"close#{ctx.from_user.id if ctx.from_user else self.me.id}", ) ] ] ), ) if self.me.is_bot: await status_message.delete_msg() # Update and restart bot @app.on_message(filters.command(["restart"], COMMAND_HANDLER) & filters.user(SUDO)) @use_chat_lang() async def update_restart(_, ctx: Message, strings): msg = await ctx.reply_msg(strings("up_and_rest")) await shell_exec("python3 update.py") with open("restart.pickle", "wb") as status: pickle.dump([ctx.chat.id, msg.id], status) os.execvp(sys.executable, [sys.executable, "-m", "misskaty"]) @app.on_poll(filters.chat(-1001777794636), group=-5) async def tespoll(self, msg): await ctx.reply(msg) @app.on_raw_update(group=-99) async def updtebot(client, update, users, _): if isinstance(update, UpdateBotStopped): niuser = users[update.user_id] if update.stopped and await db.is_user_exist(niuser.id): await db.delete_user(niuser.id) await client.send_msg( LOG_CHANNEL, f"{niuser.first_name} ({niuser.id}) " f"{'BLOCKED' if update.stopped else 'UNBLOCKED'} the bot at " f"{datetime.fromtimestamp(update.date)}", ) async def aexec(code, c, m): exec( "async def __aexec(c, m): " + "\n p = print" + "\n replied = m.reply_to_message" + "".join(f"\n {l_}" for l_ in code.split("\n")) ) return await locals()["__aexec"](c, m) async def shell_exec(code, treat=True): process = await asyncio.create_subprocess_shell( code, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) stdout = (await process.communicate())[0] if treat: stdout = stdout.decode().strip() return stdout, process async def auto_restart(): await shell_exec("python3 update.py") os.execvp(sys.executable, [sys.executable, "-m", "misskaty"]) if AUTO_RESTART: scheduler = AsyncIOScheduler(timezone="Asia/Jakarta") scheduler.add_job(auto_restart, trigger="interval", days=3) scheduler.start()