MissKatyPyro/misskaty/plugins/stickers.py
2023-01-09 06:01:38 +00:00

364 lines
13 KiB
Python

import os
import shutil
import tempfile
import asyncio
import re
from PIL import Image
from misskaty.helper.http import http
from pyrogram import emoji, filters
from pyrogram.file_id import FileId
from pyrogram.errors import PeerIdInvalid, StickersetInvalid, BadRequest
from pyrogram.raw.functions.messages import GetStickerSet, SendMedia
from pyrogram.raw.functions.stickers import (
AddStickerToSet,
CreateStickerSet,
RemoveStickerFromSet,
)
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from pyrogram.raw.types import (
DocumentAttributeFilename,
InputDocument,
InputMediaUploadedDocument,
InputStickerSetItem,
InputStickerSetShortName,
)
from misskaty import app, BOT_USERNAME
from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL
__MODULE__ = "Stickers"
__HELP__ = """
/kang [Reply to sticker] - Add sticker to your pack.
/unkang [Reply to sticker] - Remove sticker from your pack.
/getsticker - Convert sticker to png.
/stickerid - View sticker ID
"""
def get_emoji_regex():
e_list = [getattr(emoji, e).encode("unicode-escape").decode("ASCII") for e in dir(emoji) if not e.startswith("_")]
# to avoid re.error excluding char that start with '*'
e_sort = sorted([x for x in e_list if not x.startswith("*")], reverse=True)
# Sort emojis by length to make sure multi-character emojis are
# matched first
pattern_ = f"({'|'.join(e_sort)})"
return re.compile(pattern_)
EMOJI_PATTERN = get_emoji_regex()
SUPPORTED_TYPES = ["jpeg", "png", "webp"]
@app.on_message(filters.command(["getsticker"], COMMAND_HANDLER))
async def getsticker_(c, m):
sticker = m.reply_to_message.sticker
if sticker:
if sticker.is_animated:
await m.reply_text("Animated sticker is not supported!")
elif not sticker.is_animated:
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "getsticker")
sticker_file = await c.download_media(
message=m.reply_to_message,
file_name=f"{path}/{sticker.set_name}.png",
)
await m.reply_to_message.reply_document(
document=sticker_file,
caption=(f"<b>Emoji:</b> {sticker.emoji}\n" f"<b>Sticker ID:</b> <code>{sticker.file_id}</code>\n\n" f"<b>Send by:</b> @{BOT_USERNAME}"),
)
shutil.rmtree(tempdir, ignore_errors=True)
else:
await m.reply_text("This is not a sticker!")
@app.on_message(filters.command("stickerid", COMMAND_HANDLER) & filters.reply)
async def getstickerid(c, m):
if m.reply_to_message.sticker:
await m.reply_text("The ID of this sticker is: <code>{stickerid}</code>".format(stickerid=m.reply_to_message.sticker.file_id))
@app.on_message(filters.command("unkang", COMMAND_HANDLER) & filters.reply)
async def getstickerid(c, m):
if m.reply_to_message.sticker:
pp = await m.reply_text("Trying to remove from pack..")
try:
decoded = FileId.decode(m.reply_to_message.sticker.file_id)
sticker = InputDocument(
id=decoded.media_id,
access_hash=decoded.access_hash,
file_reference=decoded.file_reference,
)
await app.invoke(RemoveStickerFromSet(sticker=sticker))
await pp.edit("Sticker has been removed from your pack")
except Exception as e:
await pp.edit(f"Failed remove sticker from your pack.\n\nERR: {e}")
else:
await m.reply_text(f"Please reply sticker that created by {c.me.username} to remove sticker from your pack.")
@app.on_message(filters.command(["curi", "kang"], COMMAND_HANDLER))
async def kang_sticker(c, m):
if not m.from_user:
return await m.reply_text("You are anon admin, kang stickers in my pm.")
prog_msg = await m.reply_text("Trying to steal your sticker...")
sticker_emoji = "🤔"
packnum = 0
packname_found = False
resize = False
animated = False
videos = False
convert = False
reply = m.reply_to_message
user = await c.resolve_peer(m.from_user.username or m.from_user.id)
if reply and reply.media:
if reply.photo:
resize = True
elif reply.animation:
videos = True
convert = True
elif reply.video:
convert = True
videos = True
elif reply.document:
if "image" in reply.document.mime_type:
# mime_type: image/webp
resize = True
elif MessageMediaType.VIDEO == reply.document.mime_type or MessageMediaType.ANIMATION == reply.document.mime_type:
# mime_type: application/video
videos = True
convert = True
elif "tgsticker" in reply.document.mime_type:
# mime_type: application/x-tgsticker
animated = True
elif reply.sticker:
if not reply.sticker.file_name:
return await prog_msg.edit_text("The sticker has no name.")
if reply.sticker.emoji:
sticker_emoji = reply.sticker.emoji
animated = reply.sticker.is_animated
videos = reply.sticker.is_video
if videos:
convert = False
elif not reply.sticker.file_name.endswith(".tgs"):
resize = True
else:
return await prog_msg.edit_text()
pack_prefix = "anim" if animated else "vid" if videos else "a"
packname = f"{pack_prefix}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 1 and m.command[1].isdigit() and int(m.command[1]) > 0:
# provide pack number to kang in desired pack
packnum = m.command.pop(1)
packname = f"{pack_prefix}{packnum}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 1:
# matches all valid emojis in input
sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[1:])))) or sticker_emoji
filename = await c.download_media(m.reply_to_message)
if not filename:
# Failed to download
await prog_msg.delete()
return
elif m.entities and len(m.entities) > 1:
pack_prefix = "a"
filename = "sticker.png"
packname = f"c{m.from_user.id}_by_{c.me.username}"
img_url = next(
(m.text[y.offset : (y.offset + y.length)] for y in m.entities if y.type == "url"),
None,
)
if not img_url:
await prog_msg.delete()
return
try:
r = await http.get(img_url)
if r.status_code == 200:
with open(filename, mode="wb") as f:
f.write(r.read())
except Exception as r_e:
return await prog_msg.edit_text(f"{r_e.__class__.__name__} : {r_e}")
if len(m.command) > 2:
# m.command[1] is image_url
if m.command[2].isdigit() and int(m.command[2]) > 0:
packnum = m.command.pop(2)
packname = f"a{packnum}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 2:
sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[2:])))) or sticker_emoji
resize = True
else:
return await prog_msg.edit_text("Want me to guess the sticker? Please tag a sticker.")
try:
if resize:
filename = resize_image(filename)
elif convert:
filename = await convert_video(filename)
if filename is False:
return await prog_msg.edit_text("Error")
max_stickers = 50 if animated else 120
while not packname_found:
try:
stickerset = await c.invoke(
GetStickerSet(
stickerset=InputStickerSetShortName(short_name=packname),
hash=0,
)
)
if stickerset.set.count >= max_stickers:
packnum += 1
packname = f"{pack_prefix}_{packnum}_{m.from_user.id}_by_{c.me.username}"
else:
packname_found = True
except StickersetInvalid:
break
file = await c.save_file(filename)
media = await c.invoke(
SendMedia(
peer=(await c.resolve_peer(LOG_CHANNEL)),
media=InputMediaUploadedDocument(
file=file,
mime_type=c.guess_mime_type(filename),
attributes=[DocumentAttributeFilename(file_name=filename)],
),
message=f"#Sticker kang by UserID -> {m.from_user.id}",
random_id=c.rnd_id(),
),
)
msg_ = media.updates[-1].message
stkr_file = msg_.media.document
if packname_found:
await prog_msg.edit_text("<code>Using existing sticker pack...</code>")
await c.invoke(
AddStickerToSet(
stickerset=InputStickerSetShortName(short_name=packname),
sticker=InputStickerSetItem(
document=InputDocument(
id=stkr_file.id,
access_hash=stkr_file.access_hash,
file_reference=stkr_file.file_reference,
),
emoji=sticker_emoji,
),
)
)
else:
await prog_msg.edit_text("<b>Creating a new sticker pack...</b>")
stkr_title = f"{m.from_user.first_name}'s"
if animated:
stkr_title += "AnimPack"
elif videos:
stkr_title += "VidPack"
if packnum != 0:
stkr_title += f" v{packnum}"
try:
await c.invoke(
CreateStickerSet(
user_id=user,
title=stkr_title,
short_name=packname,
stickers=[
InputStickerSetItem(
document=InputDocument(
id=stkr_file.id,
access_hash=stkr_file.access_hash,
file_reference=stkr_file.file_reference,
),
emoji=sticker_emoji,
)
],
animated=animated,
videos=videos,
)
)
except PeerIdInvalid:
return await prog_msg.edit_text(
"It looks like you've never interacted with me in private chat, you need to do that first..",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Click Me",
url=f"https://t.me/{c.me.username}?start",
)
]
]
),
)
except BadRequest:
return await prog_msg.edit_text("Your Sticker Pack is full if your pack is not in v1 Type /kang 1, if it is not in v2 Type /kang 2 and so on.")
except Exception as all_e:
await prog_msg.edit_text(f"{all_e.__class__.__name__} : {all_e}")
else:
markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="👀 View Your Pack",
url=f"https://t.me/addstickers/{packname}",
)
]
]
)
await prog_msg.edit_text(
f"<b>Sticker successfully stolen!</b>\n<b>Emoji:</b> {sticker_emoji}",
reply_markup=markup,
)
# Cleanup
await c.delete_messages(chat_id=LOG_CHANNEL, message_ids=msg_.id, revoke=True)
try:
os.remove(filename)
except OSError:
pass
def resize_image(filename: str) -> str:
im = Image.open(filename)
maxsize = 512
scale = maxsize / max(im.width, im.height)
sizenew = (int(im.width * scale), int(im.height * scale))
im = im.resize(sizenew, Image.NEAREST)
downpath, f_name = os.path.split(filename)
# not hardcoding png_image as "sticker.png"
png_image = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.png")
im.save(png_image, "PNG")
if png_image != filename:
os.remove(filename)
return png_image
async def convert_video(filename: str) -> str:
downpath, f_name = os.path.split(filename)
webm_video = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.webm")
cmd = [
"mediaextract",
"-loglevel",
"quiet",
"-i",
filename,
"-t",
"00:00:03",
"-vf",
"fps=30",
"-c:v",
"vp9",
"-b:v:",
"500k",
"-preset",
"ultrafast",
"-s",
"512x512",
"-y",
"-an",
webm_video,
]
proc = await asyncio.create_subprocess_exec(*cmd)
# Wait for the subprocess to finish
await proc.communicate()
if webm_video != filename:
os.remove(filename)
return webm_video