MissKatyPyro/misskaty/plugins/stickers.py
yasirarism 3a6c0b424e
MultiLanguage Support [BETA] (#21)
Pushed to public since i dont know when became stable
2023-03-28 13:15:39 +07:00

365 lines
13 KiB
Python

import asyncio
import os
import re
import shutil
import tempfile
from PIL import Image
from pyrogram import emoji, filters, enums
from pyrogram.errors import BadRequest, PeerIdInvalid, StickersetInvalid
from pyrogram.file_id import FileId
from pyrogram.raw.functions.messages import GetStickerSet, SendMedia
from pyrogram.raw.functions.stickers import AddStickerToSet, CreateStickerSet, RemoveStickerFromSet
from pyrogram.raw.types import DocumentAttributeFilename, InputDocument, InputMediaUploadedDocument, InputStickerSetItem, InputStickerSetShortName
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from misskaty import BOT_USERNAME, app
from misskaty.core.decorator.ratelimiter import ratelimiter
from misskaty.helper.http import http
from misskaty.helper.localization import use_chat_lang
from misskaty.vars import COMMAND_HANDLER, LOG_CHANNEL
__MODULE__ = "Stickers"
__HELP__ = """
/kang [Reply to sticker] - Add sticker to your pack.
/unkang [Reply to sticker] - Remove sticker from your pack (Only can remove sticker that added by this bot.).
/getsticker - Convert sticker to png.
/stickerid - View sticker ID
"""
def get_emoji_regex():
e_list = [getattr(emoji, e).encode("unicode-escape").decode("ASCII") for e in dir(emoji) if not e.startswith("_")]
# to avoid re.error excluding char that start with '*'
e_sort = sorted([x for x in e_list if not x.startswith("*")], reverse=True)
# Sort emojis by length to make sure multi-character emojis are
# matched first
pattern_ = f"({'|'.join(e_sort)})"
return re.compile(pattern_)
EMOJI_PATTERN = get_emoji_regex()
SUPPORTED_TYPES = ["jpeg", "png", "webp"]
@app.on_message(filters.command(["getsticker"], COMMAND_HANDLER))
@ratelimiter
@use_chat_lang()
async def getsticker_(c, m, strings):
if sticker := m.reply_to_message.sticker:
if sticker.is_animated:
await m.reply_text(strings("no_anim_stick"))
else:
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "getsticker")
sticker_file = await c.download_media(
message=m.reply_to_message,
file_name=f"{path}/{sticker.set_name}.png",
)
await m.reply_to_message.reply_document(
document=sticker_file,
caption=f"<b>Emoji:</b> {sticker.emoji}\n" f"<b>Sticker ID:</b> <code>{sticker.file_id}</code>\n\n" f"<b>Send by:</b> @{BOT_USERNAME}",
)
shutil.rmtree(tempdir, ignore_errors=True)
else:
await m.reply_text(strings("not_sticker"))
@app.on_message(filters.command("stickerid", COMMAND_HANDLER) & filters.reply)
@ratelimiter
async def getstickerid(c, m):
if m.reply_to_message.sticker:
await m.reply_text("The ID of this sticker is: <code>{stickerid}</code>".format(stickerid=m.reply_to_message.sticker.file_id))
@app.on_message(filters.command("unkang", COMMAND_HANDLER) & filters.reply)
@ratelimiter
@use_chat_lang()
async def getstickerid(c, m, strings):
if m.reply_to_message.sticker:
pp = await m.reply_text(strings("unkang_msg"))
try:
decoded = FileId.decode(m.reply_to_message.sticker.file_id)
sticker = InputDocument(
id=decoded.media_id,
access_hash=decoded.access_hash,
file_reference=decoded.file_reference,
)
await app.invoke(RemoveStickerFromSet(sticker=sticker))
await pp.edit(strings("unkang_success"))
except Exception as e:
await pp.edit(strings("unkang_error").format(e=e))
else:
await m.reply_text(strings("unkang_help").format(c=c.me.username))
@app.on_message(filters.command(["curi", "kang"], COMMAND_HANDLER))
@ratelimiter
@use_chat_lang()
async def kang_sticker(c, m, strings):
if not m.from_user:
return await m.reply_text(strings("anon_warn"))
prog_msg = await m.reply_text(strings("kang_msg"))
sticker_emoji = "🤔"
packnum = 0
packname_found = False
resize = False
animated = False
videos = False
convert = False
reply = m.reply_to_message
user = await c.resolve_peer(m.from_user.username or m.from_user.id)
if reply and reply.media:
if reply.photo:
resize = True
elif reply.animation:
videos = True
convert = True
elif reply.video:
convert = True
videos = True
elif reply.document:
if "image" in reply.document.mime_type:
# mime_type: image/webp
resize = True
elif reply.document.mime_type in (
enums.MessageMediaType.VIDEO,
enums.MessageMediaType.ANIMATION,
):
# mime_type: application/video
videos = True
convert = True
elif "tgsticker" in reply.document.mime_type:
# mime_type: application/x-tgsticker
animated = True
elif reply.sticker:
if not reply.sticker.file_name:
return await prog_msg.edit_text(strings("stick_no_name"))
if reply.sticker.emoji:
sticker_emoji = reply.sticker.emoji
animated = reply.sticker.is_animated
videos = reply.sticker.is_video
if videos:
convert = False
elif not reply.sticker.file_name.endswith(".tgs"):
resize = True
else:
return await prog_msg.edit_text()
pack_prefix = "anim" if animated else "vid" if videos else "a"
packname = f"{pack_prefix}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 1 and m.command[1].isdigit() and int(m.command[1]) > 0:
# provide pack number to kang in desired pack
packnum = m.command.pop(1)
packname = f"{pack_prefix}{packnum}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 1:
# matches all valid emojis in input
sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[1:])))) or sticker_emoji
filename = await c.download_media(m.reply_to_message)
if not filename:
# Failed to download
await prog_msg.delete()
return
elif m.entities and len(m.entities) > 1:
pack_prefix = "a"
filename = "sticker.png"
packname = f"c{m.from_user.id}_by_{c.me.username}"
img_url = next(
(m.text[y.offset : (y.offset + y.length)] for y in m.entities if y.type == "url"),
None,
)
if not img_url:
await prog_msg.delete()
return
try:
r = await http.get(img_url)
if r.status_code == 200:
with open(filename, mode="wb") as f:
f.write(r.read())
except Exception as r_e:
return await prog_msg.edit_text(f"{r_e.__class__.__name__} : {r_e}")
if len(m.command) > 2:
# m.command[1] is image_url
if m.command[2].isdigit() and int(m.command[2]) > 0:
packnum = m.command.pop(2)
packname = f"a{packnum}_{m.from_user.id}_by_{c.me.username}"
if len(m.command) > 2:
sticker_emoji = "".join(set(EMOJI_PATTERN.findall("".join(m.command[2:])))) or sticker_emoji
resize = True
else:
return await prog_msg.edit_text(strings("kang_help"))
try:
if resize:
filename = resize_image(filename)
elif convert:
filename = await convert_video(filename)
if filename is False:
return await prog_msg.edit_text("Error")
max_stickers = 50 if animated else 120
while not packname_found:
try:
stickerset = await c.invoke(
GetStickerSet(
stickerset=InputStickerSetShortName(short_name=packname),
hash=0,
)
)
if stickerset.set.count >= max_stickers:
packnum += 1
packname = f"{pack_prefix}_{packnum}_{m.from_user.id}_by_{c.me.username}"
else:
packname_found = True
except StickersetInvalid:
break
file = await c.save_file(filename)
media = await c.invoke(
SendMedia(
peer=(await c.resolve_peer(LOG_CHANNEL)),
media=InputMediaUploadedDocument(
file=file,
mime_type=c.guess_mime_type(filename),
attributes=[DocumentAttributeFilename(file_name=filename)],
),
message=f"#Sticker kang by UserID -> {m.from_user.id}",
random_id=c.rnd_id(),
),
)
msg_ = media.updates[-1].message
stkr_file = msg_.media.document
if packname_found:
await prog_msg.edit_text(strings("exist_pack"))
await c.invoke(
AddStickerToSet(
stickerset=InputStickerSetShortName(short_name=packname),
sticker=InputStickerSetItem(
document=InputDocument(
id=stkr_file.id,
access_hash=stkr_file.access_hash,
file_reference=stkr_file.file_reference,
),
emoji=sticker_emoji,
),
)
)
else:
await prog_msg.edit_text(strings("new_packs"))
stkr_title = f"{m.from_user.first_name}'s"
if animated:
stkr_title += "AnimPack"
elif videos:
stkr_title += "VidPack"
if packnum != 0:
stkr_title += f" v{packnum}"
try:
await c.invoke(
CreateStickerSet(
user_id=user,
title=stkr_title,
short_name=packname,
stickers=[
InputStickerSetItem(
document=InputDocument(
id=stkr_file.id,
access_hash=stkr_file.access_hash,
file_reference=stkr_file.file_reference,
),
emoji=sticker_emoji,
)
],
animated=animated,
videos=videos,
)
)
except PeerIdInvalid:
return await prog_msg.edit_text(
strings("please_start_msg"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
strings("click_me"),
url=f"https://t.me/{c.me.username}?start",
)
]
]
),
)
except BadRequest:
return await prog_msg.edit_text(strings("pack_full"))
except Exception as all_e:
await prog_msg.edit_text(f"{all_e.__class__.__name__} : {all_e}")
else:
markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text=strings("viewpack"),
url=f"https://t.me/addstickers/{packname}",
)
]
]
)
await prog_msg.edit_text(
strings("kang_success").format(emot=sticker_emoji),
reply_markup=markup,
)
# Cleanup
await c.delete_messages(chat_id=LOG_CHANNEL, message_ids=msg_.id, revoke=True)
try:
os.remove(filename)
except OSError:
pass
def resize_image(filename: str) -> str:
im = Image.open(filename)
maxsize = 512
scale = maxsize / max(im.width, im.height)
sizenew = (int(im.width * scale), int(im.height * scale))
im = im.resize(sizenew, Image.NEAREST)
downpath, f_name = os.path.split(filename)
# not hardcoding png_image as "sticker.png"
png_image = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.png")
im.save(png_image, "PNG")
if png_image != filename:
os.remove(filename)
return png_image
async def convert_video(filename: str) -> str:
downpath, f_name = os.path.split(filename)
webm_video = os.path.join(downpath, f"{f_name.split('.', 1)[0]}.webm")
cmd = [
"mediaextract",
"-loglevel",
"quiet",
"-i",
filename,
"-t",
"00:00:03",
"-vf",
"fps=30",
"-c:v",
"vp9",
"-b:v:",
"500k",
"-preset",
"ultrafast",
"-s",
"512x512",
"-y",
"-an",
webm_video,
]
proc = await asyncio.create_subprocess_exec(*cmd)
# Wait for the subprocess to finish
await proc.communicate()
if webm_video != filename:
os.remove(filename)
return webm_video