diff --git a/misskaty/helper/eval_helper.py b/misskaty/helper/eval_helper.py new file mode 100644 index 00000000..f6aae5ca --- /dev/null +++ b/misskaty/helper/eval_helper.py @@ -0,0 +1,123 @@ +import ast + + +# We dont modify locals VVVV ; this lets us keep the message available to the user-provided function +async def meval(code, globs, **kwargs): + # This function is released in the public domain. Feel free to kang it (although I like credit) + # Note to self: please don't set globals here as they will be lost. + # Don't clutter locals + locs = {} + # Restore globals later + globs = globs.copy() + # This code saves __name__ and __package into a kwarg passed to the function. + # It is set before the users code runs to make sure relative imports work + global_args = "_globs" + while global_args in globs.keys(): + # Make sure there's no name collision, just keep prepending _s + global_args = f"_{global_args}" + kwargs[global_args] = {} + for glob in ["__name__", "__package__"]: + # Copy data to args we are sending + kwargs[global_args][glob] = globs[glob] + + root = ast.parse(code, "exec") + code = root.body + + ret_name = "_ret" + ok = False + while True: + if ret_name in globs.keys(): + ret_name = f"_{ret_name}" + continue + for node in ast.walk(root): + if isinstance(node, ast.Name) and node.id == ret_name: + ret_name = f"_{ret_name}" + break + ok = True + if ok: + break + + if not code: + return None + + if not any(isinstance(node, ast.Return) for node in code): + for i in range(len(code)): + if isinstance(code[i], ast.Expr) and (i == len(code) - 1 or not isinstance(code[i].value, ast.Call)): + code[i] = ast.copy_location(ast.Expr(ast.Call(func=ast.Attribute(value=ast.Name(id=ret_name, ctx=ast.Load()), attr="append", ctx=ast.Load()), args=[code[i].value], keywords=[])), code[-1]) + else: + for node in code: + if isinstance(node, ast.Return): + node.value = ast.List(elts=[node.value], ctx=ast.Load()) + + code.append(ast.copy_location(ast.Return(value=ast.Name(id=ret_name, ctx=ast.Load())), code[-1])) + + # globals().update(**) + glob_copy = ast.Expr( + ast.Call(func=ast.Attribute(value=ast.Call(func=ast.Name(id="globals", ctx=ast.Load()), args=[], keywords=[]), attr="update", ctx=ast.Load()), args=[], keywords=[ast.keyword(arg=None, value=ast.Name(id=global_args, ctx=ast.Load()))]) + ) + ast.fix_missing_locations(glob_copy) + code.insert(0, glob_copy) + ret_decl = ast.Assign(targets=[ast.Name(id=ret_name, ctx=ast.Store())], value=ast.List(elts=[], ctx=ast.Load())) + ast.fix_missing_locations(ret_decl) + code.insert(1, ret_decl) + args = [] + for a in list(map(lambda x: ast.arg(x, None), kwargs.keys())): + ast.fix_missing_locations(a) + args += [a] + args = ast.arguments( + args=[], + vararg=None, + kwonlyargs=args, + kwarg=None, + defaults=[], + kw_defaults=[None for _ in range(len(args))], + ) + args.posonlyargs = [] + fun = ast.AsyncFunctionDef(name="tmp", args=args, body=code, decorator_list=[], returns=None) + ast.fix_missing_locations(fun) + mod = ast.parse("") + mod.body = [fun] + comp = compile(mod, "", "exec") + + exec(comp, {}, locs) + + r = await locs["tmp"](**kwargs) + for i in range(len(r)): + if hasattr(r[i], "__await__"): + r[i] = await r[i] # workaround for 3.5 + i = 0 + while i < len(r) - 1: + if r[i] is None: + del r[i] + else: + i += 1 + if len(r) == 1: + [r] = r + elif not r: + r = None + return r + + +import os +import traceback +from typing import List, Optional + + +def format_exception(exp: BaseException, tb: Optional[List[traceback.FrameSummary]] = None) -> str: + """Formats an exception traceback as a string, similar to the Python interpreter.""" + + if tb is None: + tb = traceback.extract_tb(exp.__traceback__) + + # Replace absolute paths with relative paths + cwd = os.getcwd() + for frame in tb: + if cwd in frame.filename: + frame.filename = os.path.relpath(frame.filename) + + stack = "".join(traceback.format_list(tb)) + msg = str(exp) + if msg: + msg = f": {msg}" + + return f"Traceback (most recent call last):\n{stack}{type(exp).__name__}{msg}" diff --git a/misskaty/plugins/dev.py b/misskaty/plugins/dev.py index 3a8e3079..cd962e99 100644 --- a/misskaty/plugins/dev.py +++ b/misskaty/plugins/dev.py @@ -4,19 +4,25 @@ import os import re import sys import pickle +import json import traceback +import cfscrape +import aiohttp from shutil import disk_usage from time import time from inspect import getfullargspec +from typing import Any, Optional, Tuple from psutil import cpu_percent from psutil import disk_usage as disk_usage_percent from psutil import virtual_memory -from pyrogram import enums, filters +from pyrogram import enums, filters, types from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from misskaty import app, user, botStartTime, BOT_NAME +from misskaty.helper.http import http +from misskaty.helper.eval_helper import meval, format_exception from misskaty.helper.localization import use_chat_lang from misskaty.helper.human_read import get_readable_file_size, get_readable_time from misskaty.core.message_utils import editPesan, hapusPesan, kirimPesan @@ -115,19 +121,24 @@ async def shell(_, m, strings): msg = await editPesan(m, strings("run_exec")) if m.from_user.is_self else await kirimPesan(m, strings("run_exec")) shell = (await shell_exec(cmd[1]))[0] if len(shell) > 3000: - with open("shell_output.txt", "w") as file: - file.write(shell) - with open("shell_output.txt", "rb") as doc: + with io.BytesIO(str.encode(shell)) as doc: + doc.name = "shell_output.txt" await m.reply_document( document=doc, + caption="cmd[1][: 4096 // 4 - 1]", file_name=doc.name, - reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text=strings("cl_btn"), callback_data=f"close#{m.from_user.id}")]]), + reply_markup=InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text=strings("cl_btn"), + callback_data=f"close#{m.from_user.id}", + ) + ] + ] + ), ) await msg.delete() - try: - os.remove("shell_output.txt") - except: - pass elif len(shell) != 0: await edit_or_reply( m, @@ -145,90 +156,102 @@ async def shell(_, m, strings): @app.on_edited_message((filters.command(["ev", "run"]) | filters.regex(r"app.run\(\)$")) & filters.user(SUDO)) @user.on_message(filters.command(["ev", "run"], ".") & filters.me) @use_chat_lang() -async def evaluation_cmd_t(_, m, strings): - if (m.command and len(m.command) == 1) or m.text == "app.run()": - return await edit_or_reply(m, text=strings("no_eval")) - cmd = m.text.split(" ", 1)[1] if m.command else m.text.split("\napp.run()")[0] - status_message = await editPesan(m, strings("run_eval")) if m.from_user.is_self else await kirimPesan(m, strings("run_eval"), quote=True) +async def cmd_eval(self, message: types.Message, strings) -> Optional[str]: + if (message.command and len(message.command) == 1) or message.text == "app.run()": + return await edit_or_reply(message, text=strings("no_eval")) + status_message = await editPesan(message, strings("run_eval")) if message.from_user.is_self else await kirimPesan(message, strings("run_eval"), quote=True) + code = message.text.split(" ", 1)[1] if message.command else message.text.split("\napp.run()")[0] + out_buf = io.StringIO() + out = "" + humantime = get_readable_time - old_stderr = sys.stderr - old_stdout = sys.stdout - redirected_output = sys.stdout = io.StringIO() - redirected_error = sys.stderr = io.StringIO() - stdout, stderr, exc = None, None, None + async def _eval() -> Tuple[str, Optional[str]]: + # Message sending helper for convenience + async def send(*args: Any, **kwargs: Any) -> types.Message: + return await message.reply(*args, **kwargs) - try: - await aexec(cmd, _, m) - except NameError as e: - trace_output = "āŒ MISSING VARIABEL:\n" - trace_output += f"{e}" - exc = trace_output - except AttributeError as e: - trace_output = "āŒ MISSING ATTRIBUTE:\n" - trace_output += f"{e}" - exc = trace_output - except SyntaxError: - trace = traceback.format_exc() - splitted = str(trace).split("\n") - end_split = len(splitted) - row_1 = splitted[end_split - 4] - row_2 = splitted[end_split - 3] - row_3 = splitted[end_split - 2] - compiles = row_1 + "\n" + row_2 + "\n" + row_3 - trace_output = "āš™ļø SYNTAX ERROR:\n" - trace_output += f"{compiles}" - exc = trace_output - except ValueError as e: - trace_output = "🧮 VALUE ERROR:\n" - trace_output += f"{e}" - exc = trace_output - except Exception as e: - # trace = traceback.format_exc() - """Periksa apakah error regexnya tertangkap""" - match = re.search(r"Telegram says: .+", str(e)) - trace_output = "āš ļø COMMON ERROR:\n" - trace_output += f"{e}" - if match: - trace_output = f"šŸ‘€ {match[0]}" - exc = trace_output + # Print wrapper to capture output + # We don't override sys.stdout to avoid interfering with other output + def _print(*args: Any, **kwargs: Any) -> None: + if "file" not in kwargs: + kwargs["file"] = out_buf + return print(*args, **kwargs) - stdout = redirected_output.getvalue() - stderr = redirected_error.getvalue() - sys.stdout = old_stdout - sys.stderr = old_stderr + eval_vars = { + "self": self, + "humantime": humantime, + "m": message, + "re": re, + "os": os, + "asyncio": asyncio, + "cfscrape": cfscrape, + "json": json, + "aiohttp": aiohttp, + "print": _print, + "send": send, + "stdout": out_buf, + "traceback": traceback, + "http": http, + "replied": message.reply_to_message, + } + try: + return "", await meval(code, globals(), **eval_vars) + except Exception as e: # skipcq: PYL-W0703 + # Find first traceback frame involving the snippet + first_snip_idx = -1 + tb = traceback.extract_tb(e.__traceback__) + for i, frame in enumerate(tb): + if frame.filename == "" or frame.filename.endswith("ast.py"): + first_snip_idx = i + break + # Re-raise exception if it wasn't caused by the snippet + # Return formatted stripped traceback + stripped_tb = tb[first_snip_idx:] + formatted_tb = format_exception(e, tb=stripped_tb) + return "āš ļø Error while executing snippet\n\n", formatted_tb - evaluation = "" - if exc: - evaluation = exc - elif stderr: - evaluation = stderr - elif stdout: - evaluation = stdout - else: - evaluation = strings("success") - - final_output = f"EVAL:\n
{cmd}
\n\nOUTPUT:\n
{evaluation.strip()}
\n" + before = time() + prefix, result = await _eval() + after = time() + # Always write result if no output has been collected thus far + if not out_buf.getvalue() or result is not None: + print(result, file=out_buf) + el_us = after - before + el_str = get_readable_time(el_us) + out = out_buf.getvalue() + # Strip only ONE final newline to compensate for our message formatting + if out.endswith("\n"): + out = out[:-1] + final_output = f"{prefix}INPUT:\n
{code}
\nOUTPUT:\n
{out}
\nExecuted Time: {el_str}" if len(final_output) > 4096: - with open("MissKatyEval.txt", "w+", encoding="utf8") as out_file: - out_file.write(final_output) - await m.reply_document( - document="MissKatyEval.txt", - caption=f"{cmd[1][: 4096 // 4 - 1]}", - disable_notification=True, - thumb="assets/thumb.jpg", - reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text=strings("cl_btn"), callback_data=f"close#{m.from_user.id}")]]), - ) - os.remove("MissKatyEval.txt") - await status_message.delete() + with io.BytesIO(str.encode(out)) as out_file: + out_file.name = "MissKatyEval.txt" + await message.reply_document( + document=out_file, + caption="code[: 4096 // 4 - 1]", + disable_notification=True, + thumb="assets/thumb.jpg", + reply_markup=InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text=strings("cl_btn"), + callback_data=f"close#{message.from_user.id}", + ) + ] + ] + ), + ) + await status_message.delete() else: await edit_or_reply( - m, + message, text=final_output, parse_mode=enums.ParseMode.HTML, - reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text=strings("cl_btn"), callback_data=f"close#{m.from_user.id}")]]), + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(text=strings("cl_btn"), callback_data=f"close#{message.from_user.id}")]]), ) - if not m.from_user.is_self: + if not message.from_user.is_self: await status_message.delete() diff --git a/misskaty/plugins/download_upload.py b/misskaty/plugins/download_upload.py index 7280504c..1f6e9188 100644 --- a/misskaty/plugins/download_upload.py +++ b/misskaty/plugins/download_upload.py @@ -135,10 +135,10 @@ async def tiktokdl(client, message): link = message.command[1] msg = await message.reply("Trying download...") try: - r = (await http.get(f"https://api.hayo.my.id/api/tiktok/4?url={link}")).json() + r = (await http.get(f"https://apimu.my.id/downloader/tiktok3?link={link}")).json() await message.reply_video( - r["linkhd"], - caption=f"Title: {r['name']}\n\nUploaded for {message.from_user.mention} [{message.from_user.id}]", + r["hasil"]["download_mp4_hd"], + caption=f"Title: {r['hasil']['video_title']}\nUploader: {r['hasil']['name']}\nšŸ‘: {r['hasil']['like']} šŸ”: {r['hasil']['share']} šŸ’¬: {r['hasil']['comment']}\n\nUploaded for {message.from_user.mention} [{message.from_user.id}]", ) await msg.delete() except Exception as e: diff --git a/misskaty/plugins/nightmodev2.py b/misskaty/plugins/nightmodev2.py index 191c4698..9546c068 100644 --- a/misskaty/plugins/nightmodev2.py +++ b/misskaty/plugins/nightmodev2.py @@ -88,6 +88,7 @@ def extract_time(time_val: str): async def un_mute_chat(chat_id: int, perm: ChatPermissions): getlang = await get_db_lang(chat_id) + getlang = getlang or "en-US" try: await app.set_chat_permissions(chat_id, perm) except ChatAdminRequired: @@ -108,6 +109,7 @@ async def un_mute_chat(chat_id: int, perm: ChatPermissions): async def mute_chat(chat_id: int): getlang = await get_db_lang(chat_id) + getlang = getlang or "en-US" try: await app.set_chat_permissions(chat_id, ChatPermissions()) except ChatAdminRequired: @@ -133,8 +135,7 @@ async def nightmode_handler(c, msg, strings): chat_id = msg.chat.id if "-d" in msg.text: - job = scheduler.get_job(job_id=f"enable_nightmode_{chat_id}") - if job: + if job := scheduler.get_job(job_id=f"enable_nightmode_{chat_id}"): scheduler.remove_job(job_id=f"enable_nightmode_{chat_id}") scheduler.remove_job(job_id=f"disable_nightmode_{chat_id}") if not bool(scheduler.get_jobs()) and bool(scheduler.state): diff --git a/misskaty/plugins/ocr.py b/misskaty/plugins/ocr.py index 8f164385..af1dd98b 100644 --- a/misskaty/plugins/ocr.py +++ b/misskaty/plugins/ocr.py @@ -6,6 +6,7 @@ * Copyright @YasirPedia All rights reserved """ + import os from pyrogram import filters @@ -20,7 +21,7 @@ from misskaty.helper.http import http from misskaty.vars import COMMAND_HANDLER __MODULE__ = "OCR" -__HELP__ = f"/ocr [reply to photo] - Read Text From Image" +__HELP__ = "/ocr [reply to photo] - Read Text From Image" @app.on_message(filters.command(["ocr"], COMMAND_HANDLER)) diff --git a/misskaty/plugins/web_scraper.py b/misskaty/plugins/web_scraper.py index cffa13ed..33edf23c 100644 --- a/misskaty/plugins/web_scraper.py +++ b/misskaty/plugins/web_scraper.py @@ -257,7 +257,7 @@ async def getDataLendrive(msg, kueri, CurrentPage, user, strings): # MelongMovie GetData async def getDataMelong(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): - data = await http.get(f"http://146.190.193.128/?s={kueri}", headers=headers, follow_redirects=True) + data = await http.get(f"https://melongmovie.info/?s={kueri}", headers=headers, follow_redirects=True) bs4 = BeautifulSoup(data, "lxml") melongdata = [] for res in bs4.select(".box"): @@ -299,7 +299,7 @@ async def getDataGomov(msg, kueri, CurrentPage, user, strings): if not kueri: await editPesan(msg, strings("no_result")) else: - await editPesan(msg, strings("no_result_w_query")) + await editPesan(msg, strings("no_result_w_query").format(kueri=kueri)) return None, 0, None data = [] for i in entry: