""" * @author yasir * @date 2022-12-01 09:12:27 * @projectName MissKatyPyro * Copyright @YasirPedia All rights reserved """ import ast import asyncio import contextlib import html import json import os import re import sys import traceback from logging import getLogger from urllib.parse import quote import aiohttp import httpx from bs4 import BeautifulSoup from deep_translator import GoogleTranslator from gtts import gTTS from PIL import Image from pyrogram import Client, filters from pyrogram.errors import ( ChatAdminRequired, MessageTooLong, QueryIdInvalid, UserNotParticipant, ) from pyrogram.types import ( CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, ) from misskaty import BOT_USERNAME, app from misskaty.core.decorator.errors import capture_err from misskaty.helper.http import fetch from misskaty.helper.tools import gen_trans_image, rentry from misskaty.vars import COMMAND_HANDLER from utils import extract_user, get_file_id LOGGER = getLogger("MissKaty") __MODULE__ = "Misc" __HELP__ = """ /carbon [text or reply to text or caption] - Make beautiful snippet code on carbon from text. /removebg [Reply to image] - Remove background from image. /calc - Simple math calculator using inline buttons. /kbbi [keyword] - Search definition on KBBI (For Indonesian People) /sof [query] - Search your problem in StackOverflow. /google [query] - Search using Google Search. (/tr, /trans, /translate) [lang code] - Translate text using Google Translate. /tts - Convert Text to Voice. /imdb [query] - Find Movie Details From IMDB.com (Available in English and Indonesia version). /readqr [reply to photo] - Read QR Code From Photo. /createqr [text] - Convert Text to QR Code. /anime [query] - Search title in myanimelist. /info - Get info user with Pic and full description if user set profile picture. /id - Get simple user ID. """ def remove_html_tags(text): """Remove html tags from a string""" clean = re.compile("<.*?>") return re.sub(clean, "", text) def calcExpression(text): try: return float(ast.literal_eval(text)) except (SyntaxError, ZeroDivisionError): return "" except TypeError: return float(ast.literal_eval(text.replace("(", "*("))) except Exception as e: LOGGER.error(e, exc_info=True) return "" def calc_btn(uid): return InlineKeyboardMarkup( [ [ InlineKeyboardButton("DEL", callback_data=f"calc|{uid}|DEL"), InlineKeyboardButton("AC", callback_data=f"calc|{uid}|AC"), InlineKeyboardButton("(", callback_data=f"calc|{uid}|("), InlineKeyboardButton(")", callback_data=f"calc|{uid}|)"), ], [ InlineKeyboardButton("7", callback_data=f"calc|{uid}|7"), InlineKeyboardButton("8", callback_data=f"calc|{uid}|8"), InlineKeyboardButton("9", callback_data=f"calc|{uid}|9"), InlineKeyboardButton("Γ·", callback_data=f"calc|{uid}|/"), ], [ InlineKeyboardButton("4", callback_data=f"calc|{uid}|4"), InlineKeyboardButton("5", callback_data=f"calc|{uid}|5"), InlineKeyboardButton("6", callback_data=f"calc|{uid}|6"), InlineKeyboardButton("Γ—", callback_data=f"calc|{uid}|*"), ], [ InlineKeyboardButton("1", callback_data=f"calc|{uid}|1"), InlineKeyboardButton("2", callback_data=f"calc|{uid}|2"), InlineKeyboardButton("3", callback_data=f"calc|{uid}|3"), InlineKeyboardButton("-", callback_data=f"calc|{uid}|-"), ], [ InlineKeyboardButton(".", callback_data=f"calc|{uid}|."), InlineKeyboardButton("0", callback_data=f"calc|{uid}|0"), InlineKeyboardButton("=", callback_data=f"calc|{uid}|="), InlineKeyboardButton("+", callback_data=f"calc|{uid}|+"), ], ] ) @app.on_message(filters.command(["calc", "calculate", "calculator"])) async def calculate_handler(self, ctx): if not ctx.from_user: return await ctx.reply_text( text=f"Made by @{self.me.username}", reply_markup=calc_btn(ctx.from_user.id), disable_web_page_preview=True, quote=True, ) @app.on_callback_query(filters.regex("^calc")) async def calc_cb(self, query): _, uid, data = query.data.split("|") if query.from_user.id != int(uid): return await query.answer("Who are you??", show_alert=True, cache_time=5) try: text = query.message.text.split("\n")[0].strip().split("=")[0].strip() text = "" if f"Made by @{self.me.username}" in text else text inpt = text + query.data result = "" if data == "=": result = calcExpression(text) text = "" elif data == "DEL": text = text[:-1] elif data == "AC": text = "" else: dot_dot_check = re.findall(r"(\d*\.\.|\d*\.\d+\.)", inpt) opcheck = re.findall(r"([*/\+-]{2,})", inpt) if not dot_dot_check and not opcheck: if strOperands := re.findall(r"(\.\d+|\d+\.\d+|\d+)", inpt): text += data result = calcExpression(text) text = f"{text:<50}" if result: if text: text += f"\n{result:>50}" else: text = result text += f"\n\nMade by @{self.me.username}" await query.message.edit_msg( text=text, disable_web_page_preview=True, reply_markup=calc_btn(query.from_user.id), ) except Exception as error: LOGGER.error(error) @app.on_cmd("removebg") async def removebg(_, ctx: Client): if not ctx.reply_to_message: return await ctx.reply_msg("Please reply image.") if not ctx.reply_to_message.photo: return await ctx.reply_msg("Only support photo for remove background.") prg = await ctx.reply("Processing...") source = await ctx.reply_to_message.download() await gen_trans_image(source, f"transp_bckgrnd-{ctx.from_user.id}.png") await ctx.reply_document(f"transp_bckgrnd-{ctx.from_user.id}.png") await prg.delete_msg() os.remove(source) os.remove(f"transp_bckgrnd-{ctx.from_user.id}.png") @app.on_cmd("kbbi") async def kbbi_search(_, ctx: Client): if len(ctx.command) == 1: return await ctx.reply_msg("Please add keyword to search definition in kbbi") try: r = await fetch.get(f"https://yasirapi.eu.org/kbbi?kata={ctx.input}") except httpx.HTTPError as e: return await ctx.reply_msg(f"HTTP error occured: {e}") if r.status_code != 200: return await ctx.reply("Maaf, makna kata tersebut tidak ditemukan.") parse = r.json() if nomsg := parse.get("detail"): return await ctx.reply_msg(nomsg) kbbi_btn = InlineKeyboardMarkup( [[InlineKeyboardButton(text="Open in Web", url=parse.get("link"))]] ) res = "Definisi:\n" for _, a in enumerate(parse.get("result"), start=1): submakna = "".join(f"{a}, " for a in a["makna"][0]["submakna"])[:-2] contoh = "".join(f"{a}, " for a in a["makna"][0]["contoh"])[:-2] kt_dasar = "".join(f"{a}, " for a in a["kata_dasar"])[:-2] bt_takbaku = "".join(f"{a}, " for a in a["bentuk_tidak_baku"])[:-2] res += f"{a['nama']} ({a['makna'][0]['kelas'][0]['nama']}: {a['makna'][0]['kelas'][0]['deskripsi']})\nKata Dasar: {kt_dasar if kt_dasar else '-'}\nBentuk Tidak Baku: {bt_takbaku if bt_takbaku else '-'}\nSubmakna: {submakna}\nContoh: {contoh if contoh else '-'}\n\n" await ctx.reply(f"{res}By YasirPedia API", reply_markup=kbbi_btn) @app.on_cmd("carbon") async def carbon_make(self: Client, ctx: Message): if ctx.reply_to_message and ctx.reply_to_message.text: text = ctx.reply_to_message.text elif ctx.reply_to_message and ctx.reply_to_message.caption: text = ctx.reply_to_message.caption elif len(ctx.command) > 1: text = ctx.input else: return await ctx.reply( "Please reply text to make carbon or add text after command." ) json_data = { "code": text, "backgroundColor": "#1F816D", } with contextlib.redirect_stdout(sys.stderr): try: response = await fetch.post( "https://carbon.yasirapi.eu.org/api/cook", json=json_data, timeout=20 ) except httpx.HTTPError as exc: return await ctx.reply_msg(f"HTTP Exception for {exc.request.url} - {exc}") if response.status_code != 200: return await ctx.reply_photo( f"https://http.cat/{response.status_code}", caption="🀧 Carbon API ERROR", ) fname = ( f"carbonBY_{ctx.from_user.id if ctx.from_user else ctx.sender_chat.title}.png" ) with open(fname, "wb") as e: e.write(response.content) await ctx.reply_photo(fname, caption=f"Generated by @{self.me.username}") os.remove(fname) @app.on_message(filters.command("readqr", COMMAND_HANDLER)) async def readqr(c, m): if not m.reply_to_message: return await m.reply("Please reply photo that contain valid QR Code.") if not m.reply_to_message.photo: return await m.reply("Please reply photo that contain valid QR Code.") foto = await m.reply_to_message.download() myfile = {"file": (foto, open(foto, "rb"), "application/octet-stream")} url = "http://api.qrserver.com/v1/read-qr-code/" r = await fetch.post(url, files=myfile) os.remove(foto) if res := r.json()[0]["symbol"][0]["data"] is None: return await m.reply_msg(res) await m.reply_msg( f"QR Code Reader by @{c.me.username}: {r.json()[0]['symbol'][0]['data']}", quote=True, ) @app.on_message(filters.command("createqr", COMMAND_HANDLER)) async def makeqr(c, m): if m.reply_to_message and m.reply_to_message.text: teks = m.reply_to_message.text elif len(m.command) > 1: teks = m.text.split(None, 1)[1] else: return await m.reply( "Please add text after command to convert text -> QR Code." ) url = f"https://api.qrserver.com/v1/create-qr-code/?data={quote(teks)}&size=300x300" await m.reply_photo( url, caption=f"QR Code Maker by @{c.me.username}", quote=True ) @app.on_message(filters.command(["sof"], COMMAND_HANDLER)) @capture_err async def stackoverflow(_, message): if len(message.command) == 1: return await message.reply("Give a query to search in StackOverflow!") r = ( await fetch.get( f"https://api.stackexchange.com/2.3/search/excerpts?order=asc&sort=relevance&q={message.command[1]}&accepted=True&migrated=FalseΒ¬ice=False&wiki=False&site=stackoverflow" ) ).json() msg = await message.reply("Getting data..") hasil = "" for count, data in enumerate(r["items"], start=1): question = data["question_id"] title = data["title"] snippet = ( remove_html_tags(data["excerpt"])[:80].replace("\n", "").replace(" ", "") if len(remove_html_tags(data["excerpt"])) > 80 else remove_html_tags(data["excerpt"]).replace("\n", "").replace(" ", "") ) hasil += f"{count}. {title}\n{snippet}\n" try: await msg.edit(hasil) except MessageTooLong: url = await rentry(hasil) await msg.edit(f"Your text pasted to rentry because has long text:\n{url}") except Exception as e: await msg.edit(e) @app.on_message(filters.command(["google"], COMMAND_HANDLER)) @capture_err async def gsearch(self, message): if len(message.command) == 1: return await message.reply("Give a query to search in Google!") def shorten_text(text): if len(text) > 150: return text[:150] + "..." return text query = message.text.split(maxsplit=1)[1] msg = await message.reply_text(f"**Googling** for `{query}` ...") try: gs = await fetch.get( f"https://www.google.com/search?q={query}&gl=id&hl=id&num=16", ) soup = BeautifulSoup(gs.text, "lxml") # collect data data = [] for result in soup.select(".tF2Cxc"): link = result.select_one(".yuRUbf a")["href"] title = result.select_one(".DKV0Md").text if snippet := result.find(class_="VwiC3b yXK7lf lVm3ye r025kc hJNv6b"): snippet = snippet.get_text() elif snippet := result.find(class_="VwiC3b yXK7lf lVm3ye r025kc hJNv6b Hdw6tb"): snippet = snippet.get_text() else: snippet = "-" data.append( { "title": html.escape(title), "link": link, "snippet": shorten_text(html.escape(snippet)), } ) arr = json.dumps(data, indent=2, ensure_ascii=False) parse = json.loads(arr) total = len(parse) res = "".join( f"{i['title']}\n{i['snippet']}\n\n" for i in parse ) except Exception: exc = traceback.format_exc() return await msg.edit(exc) await msg.edit_msg( text=f"Ada {total} Hasil Pencarian dari {query}:\n{res}GoogleSearch by @{BOT_USERNAME}", disable_web_page_preview=True, ) @app.on_message(filters.command(["tr", "trans", "translate"], COMMAND_HANDLER)) @capture_err async def translate(_, message): if message.reply_to_message and ( message.reply_to_message.text or message.reply_to_message.caption ): target_lang = "id" if len(message.command) == 1 else message.text.split()[1] text = message.reply_to_message.text or message.reply_to_message.caption else: if len(message.command) < 3: return await message.reply_msg( "Berikan Kode bahasa yang valid.\n[Available options](https://tgraph.yasirweb.eu.org/Lang-Codes-11-08).\nUsage: /tr en", ) target_lang = message.text.split(None, 2)[1] text = message.text.split(None, 2)[2] msg = await message.reply_msg("Menerjemahkan...") try: my_translator = GoogleTranslator(source="auto", target=target_lang) result = my_translator.translate(text=text) await msg.edit_msg( f"Translation using source = {my_translator.source} and target = {my_translator.target}\n\n-> {result}" ) except MessageTooLong: url = await rentry(result) await msg.edit_msg( f"Your translated text pasted to rentry because has long text:\n{url}" ) except Exception as err: await msg.edit_msg(f"Oppss, Error: {str(err)}") @app.on_message(filters.command(["tts"], COMMAND_HANDLER)) @capture_err async def tts_convert(_, message): if message.reply_to_message and ( message.reply_to_message.text or message.reply_to_message.caption ): if len(message.text.split()) == 1: target_lang = "id" else: target_lang = message.text.split()[1] text = message.reply_to_message.text or message.reply_to_message.caption else: if len(message.text.split()) <= 2: await message.reply_text( "Berikan Kode bahasa yang valid.\n[Available options](https://telegra.ph/Lang-Codes-11-08).\n*Usage:* /tts en [text]", ) return target_lang = message.text.split(None, 2)[1] text = message.text.split(None, 2)[2] msg = await message.reply("Converting to voice...") fname = f"tts_BY_{message.from_user.id if message.from_user else message.sender_chat.title}.mp3" try: tts = gTTS(text, lang=target_lang) tts.save(fname) except ValueError as err: await msg.edit(f"Error: {str(err)}") return await msg.delete() await msg.reply_audio(fname) if os.path.exists(fname): os.remove(fname) @app.on_message(filters.command(["tosticker"], COMMAND_HANDLER)) @capture_err async def tostick(client, message): try: if not message.reply_to_message or not message.reply_to_message.photo: return await message.reply_text("Reply ke foto untuk mengubah ke sticker") sticker = await client.download_media( message.reply_to_message.photo.file_id, f"tostick_{message.from_user.id}.webp", ) await message.reply_sticker(sticker) os.remove(sticker) except Exception as e: await message.reply_text(str(e)) @app.on_message(filters.command(["id"], COMMAND_HANDLER)) async def showid(_, message): chat_type = message.chat.type.value if chat_type == "private": user_id = message.chat.id first = message.from_user.first_name last = message.from_user.last_name or "" username = message.from_user.username dc_id = message.from_user.dc_id or "" await message.reply_text( f"➲ First Name: {first}\n➲ Last Name: {last}\n➲ Username: {username}\n➲ Telegram ID: {user_id}\n➲ Data Centre: {dc_id}", quote=True, ) elif chat_type in ["group", "supergroup"]: _id = "" _id += "➲ Chat ID: " f"{message.chat.id}\n" if message.reply_to_message: _id += ( "➲ User ID: " f"{message.from_user.id if message.from_user else 'Anonymous'}\n" "➲ Replied User ID: " f"{message.reply_to_message.from_user.id if message.reply_to_message.from_user else 'Anonymous'}\n" ) file_info = get_file_id(message.reply_to_message) else: _id += ( "➲ User ID: " f"{message.from_user.id if message.from_user else 'Anonymous'}\n" ) file_info = get_file_id(message) if file_info: _id += ( f"{file_info.message_type}: " f"{file_info.file_id}\n" ) await message.reply_text(_id, quote=True) @app.on_message(filters.command(["info"], COMMAND_HANDLER)) async def who_is(client, message): # https://github.com/SpEcHiDe/PyroGramBot/blob/master/pyrobot/plugins/admemes/whois.py#L19 if message.sender_chat: return await message.reply_msg("Not supported channel..") status_message = await message.reply_text("`Fetching user info...`") await status_message.edit("`Processing user info...`") from_user = None from_user_id, _ = extract_user(message) try: from_user = await client.get_users(from_user_id) except Exception as error: return await status_message.edit(str(error)) if from_user is None: return await status_message.edit("No valid user_id / message specified") message_out_str = "" username = f"@{from_user.username}" or "No Username" dc_id = from_user.dc_id or "[User Doesn't Have Profile Pic]" bio = (await client.get_chat(from_user.id)).bio count_pic = await client.get_chat_photos_count(from_user.id) message_out_str += f"πŸ”Έ First Name: {from_user.first_name}\n" if last_name := from_user.last_name: message_out_str += f"πŸ”Ή Last Name: {last_name}\n" message_out_str += f"πŸ†” User ID: {from_user.id}\n" message_out_str += f"✴️ User Name: {username}\n" message_out_str += f"πŸ’  Data Centre: {dc_id}\n" if bio: message_out_str += f"πŸ‘¨πŸΏβ€πŸ’» Bio: {bio}\n" message_out_str += f"πŸ“Έ Pictures: {count_pic}\n" message_out_str += f"🧐 Restricted: {from_user.is_restricted}\n" message_out_str += f"βœ… Verified: {from_user.is_verified}\n" message_out_str += f"🌐 Profile Link: Click Here\n" if message.chat.type.value in (("supergroup", "channel")): with contextlib.suppress(UserNotParticipant, ChatAdminRequired): chat_member_p = await message.chat.get_member(from_user.id) joined_date = chat_member_p.joined_date message_out_str += ( "➲Joined this Chat on: " f"{joined_date}" "\n" ) if chat_photo := from_user.photo: local_user_photo = await client.download_media(message=chat_photo.big_file_id) buttons = [ [ InlineKeyboardButton( "πŸ” Close", callback_data=f"close#{message.from_user.id}" ) ] ] reply_markup = InlineKeyboardMarkup(buttons) await message.reply_photo( photo=local_user_photo, quote=True, reply_markup=reply_markup, caption=message_out_str, disable_notification=True, ) os.remove(local_user_photo) else: buttons = [ [ InlineKeyboardButton( "πŸ” Close", callback_data=f"close#{message.from_user.id}" ) ] ] reply_markup = InlineKeyboardMarkup(buttons) await message.reply_text( text=message_out_str, reply_markup=reply_markup, quote=True, disable_notification=True, ) await status_message.delete_msg() @app.on_callback_query(filters.regex("^close")) async def close_callback(_, query: CallbackQuery): _, userid = query.data.split("#") if query.from_user.id != int(userid): with contextlib.suppress(QueryIdInvalid): return await query.answer("⚠️ Access Denied!", True) with contextlib.suppress(Exception): await query.answer("Deleting this message in 5 seconds.") await asyncio.sleep(5) await query.message.delete_msg() await query.message.reply_to_message.delete_msg() async def mdlapi(title): link = f"https://kuryana.vercel.app/search/q/{title}" async with aiohttp.ClientSession() as ses, ses.get(link) as result: return await result.json() @app.on_message(filters.command(["mdl"], COMMAND_HANDLER)) @capture_err async def mdlsearch(_, message): if " " in message.text: _, title = message.text.split(None, 1) k = await message.reply("Sedang mencari di Database MyDramaList.. 😴") movies = await mdlapi(title) res = movies["results"]["dramas"] if not movies: return await k.edit("Tidak ada hasil ditemukan.. πŸ˜•") btn = [ [ InlineKeyboardButton( text=f"{movie.get('title')} ({movie.get('year')})", callback_data=f"mdls#{message.from_user.id}#{message.id}#{movie['slug']}", ) ] for movie in res ] await k.edit( f"Ditemukan {len(movies)} query dari {title}", reply_markup=InlineKeyboardMarkup(btn), ) else: await message.reply("Berikan aku nama drama yang ingin dicari. πŸ€·πŸ»β€β™‚οΈ") @app.on_callback_query(filters.regex("^mdls")) async def mdl_callback(_, query: CallbackQuery): _, user, _, slug = query.data.split("#") if user == f"{query.from_user.id}": await query.message.edit_text("Permintaan kamu sedang diproses.. ") result = "" try: res = (await fetch.get(f"https://kuryana.vercel.app/id/{slug}")).json() result += f"Title: {res['data']['title']}\n" result += ( f"AKA: {res['data']['others']['also_known_as']}\n\n" ) result += f"Rating: {res['data']['details']['score']}\n" result += f"Content Rating: {res['data']['details']['content_rating']}\n" result += f"Type: {res['data']['details']['type']}\n" result += ( f"Country: {res['data']['details']['country']}\n" ) if res["data"]["details"]["type"] == "Movie": result += f"Release Date: {res['data']['details']['release_date']}\n" elif res["data"]["details"]["type"] == "Drama": result += f"Episode: {res['data']['details']['episodes']}\n" result += ( f"Aired: {res['data']['details']['aired']}\n" ) try: result += f"Aired on: {res['data']['details']['aired_on']}\n" except: pass try: result += f"Original Network: {res['data']['details']['original_network']}\n" except: pass result += ( f"Duration: {res['data']['details']['duration']}\n" ) result += ( f"Genre: {res['data']['others']['genres']}\n\n" ) result += f"Synopsis: {res['data']['synopsis']}\n" result += f"Tags: {res['data']['others']['tags']}\n" btn = InlineKeyboardMarkup( [[InlineKeyboardButton("🎬 Open MyDramaList", url=res["data"]["link"])]] ) await query.message.edit_text(result, reply_markup=btn) except Exception as e: await query.message.edit_text(f"ERROR:\n{e}") else: await query.answer("Tombol ini bukan untukmu", show_alert=True)