import logging import os import random import re import string import time import cv2 import numpy as np from http.cookies import SimpleCookie from pyrogram import utils from re import match as re_match from typing import Union from urllib.parse import urlparse import psutil from misskaty import BOT_NAME, UBOT_NAME, botStartTime from misskaty.helper.http import fetch from misskaty.helper.human_read import get_readable_time from misskaty.plugins import ALL_MODULES LOGGER = logging.getLogger("MissKaty") URL_REGEX = r"(http|ftp|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])" GENRES_EMOJI = { "Action": "👊", "Adventure": random.choice(["🪂", "🧗‍♀", "🌋"]), "Family": "👨‍", "Musical": "🎸", "Comedy": "🤣", "Drama": " 🎭", "Ecchi": random.choice(["💋", "🥵"]), "Fantasy": random.choice(["🧞", "🧞‍♂", "🧞‍♀", "🌗"]), "Hentai": "🔞", "History": "📜", "Horror": "☠", "Mahou Shoujo": "☯", "Mecha": "🤖", "Music": "🎸", "Mystery": "🔮", "Psychological": "♟", "Romance": "💞", "Sci-Fi": "🛸", "Slice of Life": random.choice(["☘", "🍁"]), "Sports": "⚽️", "Supernatural": "🫧", "Thriller": random.choice(["🥶", "🔪", "🤯"]), } def is_url(url): url = re_match(URL_REGEX, url) return bool(url) async def bot_sys_stats(): bot_uptime = int(time.time() - botStartTime) cpu = psutil.cpu_percent(interval=0.5) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent process = psutil.Process(os.getpid()) return f""" {UBOT_NAME}@{BOT_NAME} ------------------ UPTIME: {get_readable_time(bot_uptime)} BOT: {round(process.memory_info()[0] / 1024**2)} MB CPU: {cpu}% RAM: {mem}% DISK: {disk}% TOTAL PLUGINS: {len(ALL_MODULES)} """ def remove_N(seq): i = 1 while i < len(seq): if seq[i] == seq[i - 1]: del seq[i] i -= 1 else: i += 1 def get_random_string(length: int = 5): text_str = "".join( random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(length) ) return text_str.upper() async def rentry(teks): # buat dapetin cookie cookie = SimpleCookie() kuki = (await fetch.get("https://rentry.co")).cookies cookie.load(kuki) kukidict = {key: value.value for key, value in cookie.items()} # headernya header = {"Referer": "https://rentry.co"} payload = {"csrfmiddlewaretoken": kukidict["csrftoken"], "text": teks} return ( ( await fetch.post( "https://rentry.co/api/new", data=payload, headers=header, cookies=kukidict, ) ) .json() .get("url") ) def get_provider(url): def pretty(names): name = names[1] if names[0] == "play": name = "Google Play Movies" elif names[0] == "hbogoasia": name = "HBO Go Asia" elif names[0] == "maxstream": name = "Max Stream" elif names[0] == "klikfilm": name = "Klik Film" return name.title() netloc = urlparse(url).netloc return pretty(netloc.split(".")) async def search_jw(movie_name: str, locale: Union[str, None] = "ID"): m_t_ = "" try: response = ( await fetch.get( f"https://yasirapi.eu.org/justwatch?q={movie_name}&locale={locale}" ) ).json() except: return m_t_ if not response.get("results"): LOGGER.error("JustWatch API Error or got Rate Limited.") return m_t_ for item in response["results"]["data"]["popularTitles"]["edges"]: if item["node"]["content"]["title"] == movie_name: t_m_ = [] for offer in item["node"].get("offers", []): url = offer["standardWebURL"] if url not in t_m_: p_o = get_provider(url) m_t_ += f"{p_o} | " t_m_.append(url) if m_t_ != "": m_t_ = m_t_[:-2].strip() break return m_t_ def isValidURL(str): # Regex to check valid URL regex = ("((http|https)://)(www.)?" + "[a-zA-Z0-9@:%._\\+~#?&//=]" + "{2,256}\\.[a-z]" + "{2,6}\\b([-a-zA-Z0-9@:%" + "._\\+~#?&//=]*)") # Compile the ReGex p = re.compile(regex) # If the string is empty # return false return False if str is None else bool((re.search(p, str))) async def gen_trans_image(msg, path): # Download image dl = await msg.download() # load image img = await utils.run_sync(cv2.imread(dl)) # convert to graky gray = await utils.run_sync(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)) # threshold input image as mask mask = await utils.run_sync(cv2.threshold(gray, 250, 255, cv2.THRESH_BINARY)[1]) # negate mask mask = 255 - mask # apply morphology to remove isolated extraneous noise # use borderconstant of black since foreground touches the edges kernel = np.ones((3,3), np.uint8) mask = await utils.run_sync(cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)) mask = await utils.run_sync(cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)) # anti-alias the mask -- blur then stretch # blur alpha channel mask = await utils.run_sync(cv2.GaussianBlur(mask, (0,0), sigmaX=2, sigmaY=2, borderType = cv2.BORDER_DEFAULT)) # linear stretch so that 127.5 goes to 0, but 255 stays 255 mask = (2*(mask.astype(np.float32))-255.0).clip(0,255).astype(np.uint8) # put mask into alpha channel result = img.copy() result = await utils.run_sync(cv2.cvtColor(result, cv2.COLOR_BGR2BGRA)) result[:, :, 3] = mask # save resulting masked image await utils.run_sync(cv2.imwrite(path, result)) os.remove(dl) return path