import os import textwrap import time from datetime import datetime, timedelta from logging import getLogger from PIL import Image, ImageChops, ImageDraw, ImageFont from pyrogram import Client, enums, filters from pyrogram.enums import ChatMemberStatus as CMS from pyrogram.errors import ( ChatAdminRequired, ChatSendPhotosForbidden, ChatWriteForbidden, MessageTooLong, RPCError, ) from pyrogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup from database.greetings_db import is_welcome, toggle_welcome from database.users_chats_db import db from misskaty import BOT_USERNAME, app from misskaty.core.decorator import asyncify, capture_err from misskaty.helper import fetch, use_chat_lang from misskaty.vars import COMMAND_HANDLER, SUDO, SUPPORT_CHAT from utils import temp LOGGER = getLogger("MissKaty") def circle(pfp, size=(215, 215)): pfp = pfp.resize(size, Image.LANCZOS).convert("RGBA") bigsize = (pfp.size[0] * 3, pfp.size[1] * 3) mask = Image.new("L", bigsize, 0) draw = ImageDraw.Draw(mask) draw.ellipse((0, 0) + bigsize, fill=255) mask = mask.resize(pfp.size, Image.LANCZOS) mask = ImageChops.darker(mask, pfp.split()[-1]) pfp.putalpha(mask) return pfp def draw_multiple_line_text(image, text, font, text_start_height): """ From unutbu on [python PIL draw multiline text on image](https://stackoverflow.com/a/7698300/395857) """ draw = ImageDraw.Draw(image) image_width, _ = image.size y_text = text_start_height lines = textwrap.wrap(text, width=50) for line in lines: text_bbox = font.getbbox(line) (left, top, right, bottom) = text_bbox line_width = abs(right - left) line_height = abs(top - bottom) draw.text( ((image_width - line_width) / 2, y_text), line, font=font, fill="black" ) y_text += line_height @asyncify def welcomepic(pic, user, chat, id, strings): background = Image.open("assets/bg.png") # <- Background Image (Should be PNG) background = background.resize((1024, 500), Image.LANCZOS) pfp = Image.open(pic).convert("RGBA") pfp = circle(pfp) pfp = pfp.resize( (265, 265) ) # Resizes the Profilepicture so it fits perfectly in the circle font = ImageFont.truetype( "assets/Calistoga-Regular.ttf", 37 ) # <- Text Font of the Member Count. Change the text size for your preference member_text = strings("welcpic_msg").format( userr=user, id=id ) # <- Text under the Profilepicture with the Membercount draw_multiple_line_text(background, member_text, font, 395) draw_multiple_line_text(background, chat, font, 47) ImageDraw.Draw(background).text( (530, 460), f"Generated by @{BOT_USERNAME}", font=ImageFont.truetype("assets/Calistoga-Regular.ttf", 28), size=20, align="right", ) background.paste( pfp, (379, 123), pfp ) # Pastes the Profilepicture on the Background Image background.save( f"downloads/welcome#{id}.png" ) # Saves the finished Image in the folder with the filename return f"downloads/welcome#{id}.png" @app.on_chat_member_updated( filters.group, group=6 ) @use_chat_lang() async def member_has_joined(c: Client, member: ChatMemberUpdated, strings): if not ( member.new_chat_member and member.new_chat_member.status not in {CMS.RESTRICTED} and not member.old_chat_member ): return if not await is_welcome(member.chat.id): return user = member.new_chat_member.user if member.new_chat_member else member.from_user if user.id in SUDO: await c.send_message( member.chat.id, strings("sudo_join_msg"), ) return elif user.is_bot: return # ignore bots else: if (temp.MELCOW).get(f"welcome-{member.chat.id}") is not None: try: await temp.MELCOW[f"welcome-{member.chat.id}"].delete() except: pass mention = f"{user.first_name}" joined_date = datetime.fromtimestamp(time.time()).strftime("%Y.%m.%d %H:%M:%S") first_name = ( f"{user.first_name} {user.last_name}" if user.last_name else user.first_name ) id = user.id dc = user.dc_id or "Member tanpa PP" try: pic = await app.download_media( user.photo.big_file_id, file_name=f"pp{user.id}.png" ) except AttributeError: pic = "assets/profilepic.png" try: welcomeimg = await welcomepic( pic, user.first_name, member.chat.title, user.id, strings ) temp.MELCOW[f"welcome-{member.chat.id}"] = await c.send_photo( member.chat.id, photo=welcomeimg, caption=strings("capt_welc").format( umention=mention, uid=user.id, ttl=member.chat.title ), ) except Exception as e: LOGGER.info(e) userspammer = "" # Combot API Detection try: apicombot = ( await fetch.get(f"https://api.cas.chat/check?user_id={user.id}") ).json() if apicombot.get("ok") == "true": await app.ban_chat_member( member.chat.id, user.id, datetime.now() + timedelta(seconds=30) ) userspammer += strings("combot_msg").format( umention=user.mention, uid=user.id ) except Exception as err: LOGGER.error(f"ERROR: {err}") if userspammer != "": await c.send_message(member.chat.id, userspammer) try: os.remove(f"downloads/welcome#{user.id}.png") os.remove(f"downloads/pp{user.id}.png") except Exception: pass @app.on_cmd(["toggle_welcome"], self_admin=True, group_only=True) @app.adminsOnly("can_change_info") async def welcome_toggle_handler(client, message): is_enabled = await toggle_welcome(message.chat.id) await message.reply_msg( f"Welcome messages are now {'enabled' if is_enabled else 'disabled'}." ) @app.on_message(filters.command("leave") & filters.user(SUDO)) async def leave_a_chat(bot, message): if len(message.command) == 1: return await message.reply("Give me a chat id") chat = message.command[1] try: chat = int(chat) except: pass try: buttons = [ [InlineKeyboardButton("Support", url=f"https://t.me/{SUPPORT_CHAT}")] ] reply_markup = InlineKeyboardMarkup(buttons) await bot.send_message( chat_id=chat, text="Hai kawan, \nOwner aku bilang saya harus pergi! Jika kamu ingin menambahkan bot ini lagi silahkan kontak owner bot ini.", reply_markup=reply_markup, ) await bot.leave_chat(chat) except Exception as e: await message.reply(f"Error - {e}") await bot.leave_chat(chat) # Not to be used # @app.on_message(filters.command('invite') & filters.user(SUDO)) async def gen_invite(bot, message): if len(message.command) == 1: return await message.reply("Give me a chat id") chat = message.command[1] try: chat = int(chat) except: return await message.reply("Give Me A Valid Chat ID") try: link = await bot.create_chat_invite_link(chat) except ChatAdminRequired: return await message.reply( "Invite Link Generation Failed, Iam Not Having Sufficient Rights" ) except Exception as e: return await message.reply(f"Error {e}") await message.reply(f"Here is your Invite Link {link.invite_link}") @app.on_message(filters.command(["adminlist"], COMMAND_HANDLER)) @capture_err async def adminlist(_, message): if message.chat.type == enums.ChatType.PRIVATE: return await message.reply("Perintah ini hanya untuk grup") try: msg = await message.reply_msg(f"Getting admin list in {message.chat.title}..") administrators = [] async for m in app.get_chat_members( message.chat.id, filter=enums.ChatMembersFilter.ADMINISTRATORS ): uname = f"@{m.user.username}" if m.user.username else "" administrators.append(f"{m.user.first_name} [{uname}]") res = "".join(f"💠 {i}\n" for i in administrators) return await msg.edit_msg( f"Admin in {message.chat.title} ({message.chat.id}):\n{res}" ) except Exception as e: await message.reply(f"ERROR: {str(e)}") @app.on_message(filters.command(["kickme"], COMMAND_HANDLER)) @capture_err async def kickme(_, message): reason = None if len(message.text.split()) >= 2: reason = message.text.split(None, 1)[1] try: await message.chat.ban_member(message.from_user.id) txt = f"Pengguna {message.from_user.mention} menendang dirinya sendiri. Mungkin dia sedang frustasi 😕" txt += f"\nAlasan: {reason}" if reason else "-" await message.reply_text(txt) await message.chat.unban_member(message.from_user.id) except RPCError as ef: await message.reply_text( f"Sepertinya ada error, silahkan report ke owner saya. \nERROR: {str(ef)}" ) except Exception as err: await message.reply(f"ERROR: {err}") @app.on_message(filters.command("users") & filters.user(SUDO)) async def list_users(_, message): # https://t.me/GetTGLink/4184 msg = await message.reply("Getting List Of Users") users = await db.get_all_users() out = "Users Saved In DB Are:\n\n" async for user in users: out += f"User ID: {user.get('id')} -> {user.get('name')}" if user["ban_status"]["is_banned"]: out += "( Banned User )" out += "\n" try: await msg.edit_text(out) except MessageTooLong: with open("users.txt", "w+") as outfile: outfile.write(out) await message.reply_document("users.txt", caption="List Of Users") await msg.delete_msg() @app.on_message(filters.command("chats") & filters.user(SUDO)) async def list_chats(_, message): msg = await message.reply("Getting List Of chats") chats = await db.get_all_chats() out = "Chats Saved In DB Are:\n\n" async for chat in chats: out += f"Title: {chat.get('title')} ({chat.get('id')}) " if chat["chat_status"]["is_disabled"]: out += "( Disabled Chat )" out += "\n" try: await msg.edit_text(out) except MessageTooLong: with open("chats.txt", "w+") as outfile: outfile.write(out) await message.reply_document("chats.txt", caption="List Of Chats") await msg.delete_msg()