From bef041ba0abeb0fa6d36258b4758b0d73d91119f Mon Sep 17 00:00:00 2001 From: yasir Date: Tue, 27 Dec 2022 11:35:59 +0700 Subject: [PATCH] Add kang --- misskaty/plugins/stickers.py | 357 +++++++++++++++++++++++++++++++++++ 1 file changed, 357 insertions(+) create mode 100644 misskaty/plugins/stickers.py diff --git a/misskaty/plugins/stickers.py b/misskaty/plugins/stickers.py new file mode 100644 index 00000000..4e826635 --- /dev/null +++ b/misskaty/plugins/stickers.py @@ -0,0 +1,357 @@ +import os +import shutil +import tempfile +import asyncio +import re +import math + +from PIL import Image +from misskaty.helper.http import http +from pyrogram import filters, emoji, enums +from pyrogram.errors import PeerIdInvalid, StickersetInvalid, BadRequest +from pyrogram.raw.functions.messages import GetStickerSet, SendMedia +from pyrogram.raw.functions.stickers import AddStickerToSet, CreateStickerSet +from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup +from typing import Tuple, Callable +from functools import wraps, partial +from pyrogram.raw.types import ( + DocumentAttributeFilename, + InputDocument, + InputMediaUploadedDocument, + InputStickerSetItem, + InputStickerSetShortName, +) +from misskaty import app, BOT_USERNAME +from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL + + +def get_emoji_regex(): + e_list = [ + getattr(emoji, e).encode("unicode-escape").decode("ASCII") + for e in dir(emoji) + if not e.startswith("_") + ] + # to avoid re.error excluding char that start with '*' + e_sort = sorted([x for x in e_list if not x.startswith("*")], reverse=True) + # Sort emojis by length to make sure multi-character emojis are + # matched first + pattern_ = f"({'|'.join(e_sort)})" + return re.compile(pattern_) + + +EMOJI_PATTERN = get_emoji_regex() +SUPPORTED_TYPES = ["jpeg", "png", "webp"] + + +@app.on_message(filters.command(["getsticker"], COMMAND_HANDLER)) +async def getsticker_(c, m): + sticker = m.reply_to_message.sticker + + if sticker: + if sticker.is_animated: + await m.reply_text("Animated sticker is not supported!") + elif not sticker.is_animated: + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(tempdir, "getsticker") + sticker_file = await c.download_media( + message=m.reply_to_message, + file_name=f"{path}/{sticker.set_name}.png", + ) + await m.reply_to_message.reply_document( + document=sticker_file, + caption=( + f"Emoji: {sticker.emoji}\n" + f"Sticker ID: {sticker.file_id}\n\n" + f"Send by: @{BOT_USERNAME}" + ), + ) + shutil.rmtree(tempdir, ignore_errors=True) + else: + await m.reply_text("This is not a sticker!") + + +@app.on_message(filters.command("stickerid", COMMAND_HANDLER) & filters.reply) +async def getstickerid(c, m): + if m.reply_to_message.sticker: + await m.reply_text( + "The ID of this sticker is: {stickerid}".format( + stickerid=m.reply_to_message.sticker.file_id + ) + ) + + +@app.on_message(filters.command(["curi", "kang"], COMMAND_HANDLER)) +async def kang_sticker(c, m): + if not message.from_user: + return await message.reply_text("You are anon admin, kang stickers in my pm.") + prog_msg = await m.reply_text("Kanging Sticker..") + sticker_emoji = "🤔" + packnum = 0 + packname_found = False + resize = False + animated = False + videos = False + convert = False + reply = m.reply_to_message + user = await c.resolve_peer(m.from_user.username or m.from_user.id) + + if reply and reply.media: + if reply.photo: + resize = True + elif reply.animation: + videos = True + convert = True + elif reply.video: + convert = True + videos = True + elif reply.document: + if "image" in reply.document.mime_type: + # mime_type: image/webp + resize = True + elif ( + MessageMediaType.VIDEO == reply.document.mime_type + or MessageMediaType.ANIMATION == reply.document.mime_type + ): + # mime_type: application/video + videos = True + convert = True + elif "tgsticker" in reply.document.mime_type: + # mime_type: application/x-tgsticker + animated = True + elif reply.sticker: + if not reply.sticker.file_name: + return await prog_msg.edit_text("The sticker has no name.") + if reply.sticker.emoji: + sticker_emoji = reply.sticker.emoji + animated = reply.sticker.is_animated + videos = reply.sticker.is_video + if videos: + convert = False + elif not reply.sticker.file_name.endswith(".tgs"): + resize = True + else: + return await prog_msg.edit_text() + + pack_prefix = "anim" if animated else "vid" if videos else "a" + packname = f"{pack_prefix}_{m.from_user.id}_by_{c.me.username}" + + if len(m.command) > 1 and m.command[1].isdigit() and int(m.command[1]) > 0: + # provide pack number to kang in desired pack + packnum = m.command.pop(1) + packname = f"{pack_prefix}{packnum}_{m.from_user.id}_by_{c.me.username}" + if len(m.command) > 1: + # matches all valid emojis in input + sticker_emoji = ( + "".join(set(EMOJI_PATTERN.findall("".join(m.command[1:])))) + or sticker_emoji + ) + filename = await c.download_media(m.reply_to_message) + if not filename: + # Failed to download + await prog_msg.delete() + return + elif m.entities and len(m.entities) > 1: + pack_prefix = "a" + filename = "sticker.png" + packname = f"c{m.from_user.id}_by_{c.me.username}" + img_url = next( + ( + m.text[y.offset : (y.offset + y.length)] + for y in m.entities + if y.type == "url" + ), + None, + ) + + if not img_url: + await prog_msg.delete() + return + try: + r = await http.get(img_url) + if r.status_code == 200: + with open(filename, mode="wb") as f: + f.write(r.read()) + except Exception as r_e: + return await prog_msg.edit_text(f"{r_e.__class__.__name__} : {r_e}") + if len(m.command) > 2: + # m.command[1] is image_url + if m.command[2].isdigit() and int(m.command[2]) > 0: + packnum = m.command.pop(2) + packname = f"a{packnum}_{m.from_user.id}_by_{c.me.username}" + if len(m.command) > 2: + sticker_emoji = ( + "".join(set(EMOJI_PATTERN.findall("".join(m.command[2:])))) + or sticker_emoji + ) + resize = True + else: + return await prog_msg.edit_text( + "Want me to guess the sticker? Please tag a sticker." + ) + try: + if resize: + filename = resize_image(filename) + elif convert: + filename = await convert_video(filename) + if filename is False: + return await prog_msg.edit_text("Error") + max_stickers = 50 if animated else 120 + while not packname_found: + try: + stickerset = await c.invoke( + GetStickerSet( + stickerset=InputStickerSetShortName(short_name=packname), + hash=0, + ) + ) + if stickerset.set.count >= max_stickers: + packnum += 1 + packname = ( + f"{pack_prefix}_{packnum}_{m.from_user.id}_by_{c.me.username}" + ) + else: + packname_found = True + except StickersetInvalid: + break + file = await c.save_file(filename) + media = await c.invoke( + SendMedia( + peer=(await c.resolve_peer(LOG_CHANNEL)), + media=InputMediaUploadedDocument( + file=file, + mime_type=c.guess_mime_type(filename), + attributes=[DocumentAttributeFilename(file_name=filename)], + ), + message=f"#Sticker kang by UserID -> {m.from_user.id}", + random_id=c.rnd_id(), + ), + ) + msg_ = media.updates[-1].message + stkr_file = msg_.media.document + if packname_found: + await prog_msg.edit_text("Using existing sticker pack...") + await c.invoke( + AddStickerToSet( + stickerset=InputStickerSetShortName(short_name=packname), + sticker=InputStickerSetItem( + document=InputDocument( + id=stkr_file.id, + access_hash=stkr_file.access_hash, + file_reference=stkr_file.file_reference, + ), + emoji=sticker_emoji, + ), + ) + ) + else: + await prog_msg.edit_text("Creating a new sticker pack...") + stkr_title = f"{m.from_user.first_name}'s " + if animated: + stkr_title += "MissKaty AnimPack" + elif videos: + stkr_title += "MissKaty VidPack" + if packnum != 0: + stkr_title += f" v{packnum}" + try: + await c.invoke( + CreateStickerSet( + user_id=user, + title=stkr_title, + short_name=packname, + stickers=[ + InputStickerSetItem( + document=InputDocument( + id=stkr_file.id, + access_hash=stkr_file.access_hash, + file_reference=stkr_file.file_reference, + ), + emoji=sticker_emoji, + ) + ], + animated=animated, + videos=videos, + ) + ) + except PeerIdInvalid: + return await prog_msg.edit_text( + "It looks like you've never interacted with me in private chat, you need to do that first..", + reply_markup=InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + "Click Me", + url=f"https://t.me/{c.me.username}?start", + ) + ] + ] + ), + ) + + except BadRequest: + return await prog_msg.edit_text( + "Your Sticker Pack is full if your pack is not in v1 Type /kang 1, if it is not in v2 Type /kang 2 and so on." + ) + except Exception as all_e: + await prog_msg.edit_text(f"{all_e.__class__.__name__} : {all_e}") + else: + await prog_msg.edit_text( + "Sticker successfully stolen!\nPack.\nEmoji: {}".format( + packname, sticker_emoji + ) + ) + # Cleanup + await c.delete_messages(chat_id=LOG_CHANNEL, message_ids=msg_.id, revoke=True) + try: + os.remove(filename) + except OSError: + pass + + +def resize_image(filename: str) -> str: + im = Image.open(filename) + maxsize = 512 + scale = maxsize / max(im.width, im.height) + sizenew = (int(im.width * scale), int(im.height * scale)) + im = im.resize(sizenew, Image.NEAREST) + downpath, f_name = os.path.split(filename) + # not hardcoding png_image as "sticker.png" + png_image = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.png") + im.save(png_image, "PNG") + if png_image != filename: + os.remove(filename) + return png_image + + +async def convert_video(filename: str) -> str: + downpath, f_name = os.path.split(filename) + webm_video = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.webm") + cmd = [ + "mediaextract", + "-loglevel", + "quiet", + "-i", + filename, + "-t", + "00:00:03", + "-vf", + "fps=30", + "-c:v", + "vp9", + "-b:v:", + "500k", + "-preset", + "ultrafast", + "-s", + "512x512", + "-y", + "-an", + webm_video, + ] + + proc = await asyncio.create_subprocess_exec(*cmd) + # Wait for the subprocess to finish + await proc.communicate() + + if webm_video != filename: + os.remove(filename) + return webm_video