""" * @author yasir * @date 2023-01-23 19:41:27 * @projectName MissKatyPyro * Copyright @YasirPedia All rights reserved """ from pykeyboard import InlineButton, InlineKeyboard from pyrogram import filters from pyrogram.errors import MessageTooLong, QueryIdInvalid from pyrogram.types import CallbackQuery, Message from misskaty import app from misskaty.helper import Cache, fetch, post_to_telegraph from misskaty.plugins.web_scraper import split_arr from misskaty.vars import COMMAND_HANDLER PYPI_DICT = Cache(filename="pypi_cache.db", path="cache", in_memory=False) async def getDataPypi(msg, kueri, CurrentPage, user): if not PYPI_DICT.get(msg.id): pypijson = (await fetch.get(f"https://yasirapi.eu.org/pypi?q={kueri}")).json() if not pypijson.get("result"): await msg.edit_msg("Sorry could not find any matching results!", del_in=6) return None, 0, None PYPI_DICT.add(msg.id, [split_arr(pypijson["result"], 6), kueri], timeout=1600) try: index = int(CurrentPage - 1) PageLen = len(PYPI_DICT[msg.id][0]) extractbtn = [] pypiResult = f"#Pypi Results For: {kueri}\n\n" for c, i in enumerate(PYPI_DICT[msg.id][0][index], start=1): pypiResult += f"{c}. {i['name']} {i['version']}\nCreated: {i['created']}\nDesc: {i['description']}\n\n" extractbtn.append( InlineButton(c, f"pypidata#{CurrentPage}#{c}#{user}#{msg.id}") ) pypiResult = "".join(i for i in pypiResult if i not in "[]") return pypiResult, PageLen, extractbtn except (IndexError, KeyError): await msg.edit_msg("Sorry could not find any matching results!", del_in=6) return None, 0, None @app.on_message(filters.command(["pypi"], COMMAND_HANDLER)) async def pypi_s(_, ctx: Message): kueri = " ".join(ctx.command[1:]) if not kueri: return await ctx.reply_msg( "Please add query after command. Ex: /pypi pyrogram", del_in=6 ) pesan = await ctx.reply_msg("âŗ Please wait, getting data from pypi..", quote=True) html = await fetch.get(f"https://pypi.org/pypi/{kueri}/json") if html.status_code != 200: return await pesan.edit_msg("Failed connect fo pypi server") res = html.json() requirement = ( "".join(f"{i}, " for i in res["info"].get("requires_dist")) if res["info"].get("requires_dist") else "Unknown" ) msg = "" msg += f"Package Name: {res['info'].get('name', 'Unknown')}\n" msg += f"Version: {res['info'].get('version', 'Unknown')}\n" msg += f"License: {res['info'].get('license', 'Unknown')}\n" msg += f"Author: {res['info'].get('author', 'Unknown')}\n" msg += f"Author Email: {res['info'].get('author_email', 'Unknown')}\n" msg += f"Requirements: {requirement}\n" msg += ( f"Requires Python: {res['info'].get('requires_python', 'Unknown')}\n" ) msg += f"HomePage: {res['info'].get('home_page', 'Unknown')}\n" msg += f"Bug Track: {res['info'].get('vulnerabilities', 'Unknown')}\n" if res["info"].get("project_urls"): msg += f"Docs Url: {res['info']['project_urls'].get('Documentation', 'Unknown')}\n" msg += f"Description: {res['info'].get('summary', 'Unknown')}\n" msg += ( f"Pip Command: pip3 install {res['info'].get('name', 'Unknown')}\n" ) msg += f"Keywords: {res['info'].get('keywords', 'Unknown')}\n" keyboard = InlineKeyboard() keyboard.row(InlineButton("❌ Close", f"close#{ctx.from_user.id}")) try: await pesan.edit_msg(msg, reply_markup=keyboard) except MessageTooLong: url = await post_to_telegraph(False, f"{res['info'].get('name')} {res['info'].get('version', None)}-detail", msg) await pesan.edit_msg(f"Result is too long:\n{url}", reply_markup=keyboard) @app.on_callback_query(filters.create(lambda _, __, query: "page_pypi#" in query.data)) async def pypipage_callback(_, callback_query: CallbackQuery): if callback_query.from_user.id != int(callback_query.data.split("#")[3]): return await callback_query.answer("Not yours..", True) message_id = int(callback_query.data.split("#")[2]) CurrentPage = int(callback_query.data.split("#")[1]) try: kueri = PYPI_DICT[message_id][1] except KeyError: return await callback_query.answer( "Invalid callback data, please send CMD again.." ) except QueryIdInvalid: return try: pypires, PageLen, btn = await getDataPypi( callback_query.message, kueri, CurrentPage, callback_query.from_user.id ) except TypeError: return keyboard = InlineKeyboard() keyboard.paginate( PageLen, CurrentPage, "page_pypi#{number}" + f"#{message_id}#{callback_query.from_user.id}", ) keyboard.row(InlineButton("👇 Extract Data ", "Hmmm")) keyboard.row(*btn) keyboard.row(InlineButton("❌ Close", f"close#{callback_query.from_user.id}")) await callback_query.message.edit_msg(pypires, reply_markup=keyboard) @app.on_callback_query(filters.create(lambda _, __, query: "pypidata#" in query.data)) async def pypi_getdata(_, callback_query: CallbackQuery): if callback_query.from_user.id != int(callback_query.data.split("#")[3]): return await callback_query.answer("Not yours..", True) idlink = int(callback_query.data.split("#")[2]) message_id = int(callback_query.data.split("#")[4]) CurrentPage = int(callback_query.data.split("#")[1]) try: pkgname = PYPI_DICT[message_id][0][CurrentPage - 1][idlink - 1].get("name") except KeyError: return await callback_query.answer( "Invalid callback data, please send CMD again.." ) keyboard = InlineKeyboard() keyboard.row( InlineButton( "â†Šī¸ Back", f"page_pypi#{CurrentPage}#{message_id}#{callback_query.from_user.id}", ), InlineButton("❌ Close", f"close#{callback_query.from_user.id}"), ) try: html = await fetch.get(f"https://pypi.org/pypi/{pkgname}/json") res = html.json() requirement = ( "".join(f"{i}, " for i in res["info"].get("requires_dist")) if res["info"].get("requires_dist") else "Unknown" ) msg = "" msg += f"Package Name: {res['info'].get('name', 'Unknown')}\n" msg += f"Version: {res['info'].get('version', 'Unknown')}\n" msg += f"License: {res['info'].get('license', 'Unknown')}\n" msg += f"Author: {res['info'].get('author', 'Unknown')}\n" msg += f"Author Email: {res['info'].get('author_email', 'Unknown')}\n" msg += f"Requirements: {requirement}\n" msg += ( f"Requires Python: {res['info'].get('requires_python', 'Unknown')}\n" ) msg += f"HomePage: {res['info'].get('home_page', 'Unknown')}\n" msg += f"Bug Track: {res['info'].get('vulnerabilities', 'Unknown')}\n" if res["info"].get("project_urls"): msg += f"Docs Url: {res['info']['project_urls'].get('Documentation', 'Unknown')}\n" msg += f"Description: {res['info'].get('summary', 'Unknown')}\n" msg += ( f"Pip Command: pip3 install {res['info'].get('name', 'Unknown')}\n" ) msg += f"Keywords: {res['info'].get('keywords', 'Unknown')}\n" try: await callback_query.message.edit_msg(msg, reply_markup=keyboard) except MessageTooLong: url = await post_to_telegraph(False, f"{pkgname}-detail", msg) await callback_query.message.edit_msg(f"Result is too long:\n{url}") except Exception as err: await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard)