# * @author Yasir Aris M # * @date 2023-06-21 22:12:27 # * @projectName MissKatyPyro # * Copyright ©YasirPedia All rights reserved import asyncio import math import os import re import time from datetime import datetime from logging import getLogger from urllib.parse import unquote from bs4 import BeautifulSoup from cloudscraper import create_scraper from pyrogram import filters from pyrogram.file_id import FileId from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from pySmartDL import SmartDL from misskaty import app from misskaty.core.decorator import capture_err, new_task from misskaty.helper.http import fetch from misskaty.helper.pyro_progress import humanbytes, progress_for_pyrogram from misskaty.vars import COMMAND_HANDLER, OWNER_ID LOGGER = getLogger("MissKaty") __MODULE__ = "Download/Upload" __HELP__ = """ /download [url] - Download file from URL (OWNER Only) /download [reply_to_TG_File] - Download TG File /tgraph_up [reply_to_TG_File] - Download TG File /tiktokdl [link] - Download TikTok Video, try use ytdown command if error. /fbdl [link] - Download Facebook Video. /instadl [link] - Download photo or video from instagram (Only first post). /twitterdl [link] - Dowload video from Twitter aka X. /anon [link] - Upload files to Anonfiles. /ytdown [YT-DLP Supported URL] - Downloading YT-DLP Supported Video and Audio. """ @app.on_message(filters.command(["anon"], COMMAND_HANDLER)) async def upload(bot, message): if not message.reply_to_message: return await message.reply("Please reply to media file.") vid = [ message.reply_to_message.video, message.reply_to_message.document, message.reply_to_message.audio, message.reply_to_message.photo, ] media = next((v for v in vid if v is not None), None) if not media: return await message.reply("Unsupported media type..") m = await message.reply("Download your file to my Server...") now = time.time() dc_id = FileId.decode(media.file_id).dc_id fileku = await message.reply_to_message.download( progress=progress_for_pyrogram, progress_args=("Trying to download, please wait..", m, now, dc_id), ) try: files = {"file": open(fileku, "rb")} await m.edit("Uploading to Anonfile, Please Wait||") callapi = await fetch.post("https://api.anonfiles.com/upload", files=files) text = callapi.json() output = f'File Uploaded to Anonfile\n\n📂 File Name: {text["data"]["file"]["metadata"]["name"]}\n\n📦 File Size: {text["data"]["file"]["metadata"]["size"]["readable"]}\n\n📥 Download Link: {text["data"]["file"]["url"]["full"]}' btn = InlineKeyboardMarkup( [ [ InlineKeyboardButton( "📥 Download 📥", url=f"{text['data']['file']['url']['full']}" ) ] ] ) await m.edit(output, reply_markup=btn) except Exception as e: await bot.send_message(message.chat.id, text=f"Something Went Wrong!\n\n{e}") os.remove(fileku) @app.on_message(filters.command(["download"], COMMAND_HANDLER) & filters.user(OWNER_ID)) @capture_err @new_task async def download(client, message): pesan = await message.reply_text("Processing...", quote=True) if message.reply_to_message is not None: start_t = datetime.now() c_time = time.time() vid = [ message.reply_to_message.video, message.reply_to_message.document, message.reply_to_message.audio, message.reply_to_message.photo, ] media = next((v for v in vid if v is not None), None) if not media: return await pesan.edit_msg("Unsupported media type..") dc_id = FileId.decode(media.file_id).dc_id the_real_download_location = await client.download_media( message=message.reply_to_message, progress=progress_for_pyrogram, progress_args=("Trying to download, sabar yakk..", pesan, c_time, dc_id), ) end_t = datetime.now() ms = (end_t - start_t).seconds await pesan.edit( f"Downloaded to {the_real_download_location} in {ms} seconds." ) elif len(message.command) > 1: start_t = datetime.now() the_url_parts = " ".join(message.command[1:]) url = the_url_parts.strip() custom_file_name = os.path.basename(url) if "|" in the_url_parts: url, custom_file_name = the_url_parts.split("|") url = url.strip() custom_file_name = custom_file_name.strip() download_file_path = os.path.join("downloads/", custom_file_name) downloader = SmartDL(url, download_file_path, progress_bar=False, timeout=10) try: downloader.start(blocking=False) except Exception as err: return await message.edit_msg(str(err)) c_time = time.time() while not downloader.isFinished(): total_length = downloader.filesize or None downloaded = downloader.get_dl_size(human=True) display_message = "" now = time.time() diff = now - c_time percentage = downloader.get_progress() * 100 speed = downloader.get_speed(human=True) progress_str = "[{0}{1}]\nProgress: {2}%".format( "".join(["●" for _ in range(math.floor(percentage / 5))]), "".join(["○" for _ in range(20 - math.floor(percentage / 5))]), round(percentage, 2), ) estimated_total_time = downloader.get_eta(human=True) try: current_message = "Trying to download...\n" current_message += f"URL: {url}\n" current_message += ( f"File Name: {unquote(custom_file_name)}\n" ) current_message += f"Speed: {speed}\n" current_message += f"{progress_str}\n" current_message += f"{downloaded} of {humanbytes(total_length)}\n" current_message += f"ETA: {estimated_total_time}" if round(diff % 10.00) == 0 and current_message != display_message: await pesan.edit( disable_web_page_preview=True, text=current_message ) display_message = current_message await asyncio.sleep(10) except Exception as e: LOGGER.info(str(e)) if os.path.exists(download_file_path): end_t = datetime.now() ms = (end_t - start_t).seconds await pesan.edit( f"Downloaded to {download_file_path} in {ms} seconds" ) else: await pesan.edit( "Reply to a Telegram Media, to download it to my local server." ) @app.on_message(filters.command(["instadl"], COMMAND_HANDLER)) @capture_err async def instadl(_, message): if len(message.command) == 1: return await message.reply( f"Use command /{message.command[0]} [link] to download Instagram Photo or Video." ) link = message.command[1] msg = await message.reply("Trying download...") try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:105.0) Gecko/20100101 Firefox/105.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "X-Requested-With": "XMLHttpRequest", "Content-Length": "99", "Origin": "https://saveig.app", "Connection": "keep-alive", "Referer": "https://saveig.app/id", } post = create_scraper().post( "https://saveig.app/api/ajaxSearch", data={"q": link, "t": "media", "lang": "id"}, headers=headers, ) if post.status_code not in [200, 401]: return await message.reply("Unknown error.") res = post.json() if r := re.findall( r'href="(https?://(?!play\.google\.com|/)[^"]+)"', res["data"] ): res = r[0].replace("&", "&") fname = ( (await fetch.head(res)) .headers.get("content-disposition", "") .split("filename=")[1] ) is_img = ( (await fetch.head(res)).headers.get("content-type").startswith("image") ) if is_img: await message.reply_photo(res, caption=fname) else: await message.reply_video(res, caption=fname) await msg.delete() except Exception as e: await message.reply( f"Failed to download instagram video..\n\nReason: {e}" ) await msg.delete() @app.on_message(filters.command(["twitterdl"], COMMAND_HANDLER)) @capture_err async def twitterdl(_, message): if len(message.command) == 1: return await message.reply( f"Use command /{message.command[0]} [link] to download Twitter video." ) url = message.command[1] if "x.com" in url: url = url.replace("x.com", "twitter.com") msg = await message.reply("Trying download...") try: headers = { "Host": "ssstwitter.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:105.0) Gecko/20100101 Firefox/105.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "HX-Request": "true", "Origin": "https://ssstwitter.com", "Referer": "https://ssstwitter.com/id", "Cache-Control": "no-cache", } data = { "id": url, "locale": "id", "tt": "bc9841580b5d72e855e7d01bf3255278l", "ts": "1691416179", "source": "form", } post = await fetch.post( "https://ssstwitter.com/id", data=data, headers=headers, follow_redirects=True, ) if post.status_code not in [200, 401]: return await msg.edit_msg("Unknown error.") soup = BeautifulSoup(post.text, "lxml") cekdata = soup.find( "a", { "pure-button pure-button-primary is-center u-bl dl-button download_link without_watermark vignette_active" }, ) if not cekdata: return await message.reply( "ERROR: Oops! It seems that this tweet doesn't have a video! Try later or check your link" ) try: fname = ( (await fetch.head(cekdata.get("href"))) .headers.get("content-disposition", "") .split("filename=")[1] ) obj = SmartDL(cekdata.get("href"), progress_bar=False, timeout=15) obj.start() path = obj.get_dest() await message.reply_video( path, caption=f"{fname}\n\nUploaded for {message.from_user.mention} [{message.from_user.id}]", ) except Exception as er: LOGGER.error("ERROR: while fetching TwitterDL. %s", er) return await msg.edit_msg("ERROR: Got error while extracting link.") await msg.delete() except Exception as e: await message.reply(f"Failed to download twitter video..\n\nReason: {e}") await msg.delete() @app.on_message(filters.command(["tiktokdl"], COMMAND_HANDLER)) @capture_err async def tiktokdl(_, message): if len(message.command) == 1: return await message.reply( f"Use command /{message.command[0]} [link] to download tiktok video." ) link = message.command[1] msg = await message.reply("Trying download...") try: r = ( await fetch.post( "https://lovetik.com/api/ajax/search", data={"query": link} ) ).json() fname = (await fetch.head(r["links"][0]["a"])).headers.get( "content-disposition", "" ) filename = unquote(fname.split("filename=")[1].strip('"').split('"')[0]) await message.reply_video( r["links"][0]["a"], caption=f"Title: {filename}\nUploader: {r['author_name']}\n\nUploaded for {message.from_user.mention} [{message.from_user.id}]", ) await msg.delete() except Exception as e: await message.reply(f"Failed to download tiktok video..\n\nReason: {e}") await msg.delete() @app.on_message(filters.command(["fbdl"], COMMAND_HANDLER)) @capture_err @new_task async def fbdl(_, message): if len(message.command) == 1: return await message.reply( f"Use command /{message.command[0]} [link] to download Facebook video." ) link = message.command[1] msg = await message.reply("Trying download...") try: resjson = (await fetch.get(f"https://yasirapi.eu.org/fbdl?link={link}")).json() try: url = resjson["result"]["hd"] except KeyError: url = resjson["result"]["sd"] obj = SmartDL(url, progress_bar=False, timeout=15, verify=False) obj.start() path = obj.get_dest() await message.reply_video( path, caption=f"{os.path.basename(path)}\n\nUploaded for {message.from_user.mention} [{message.from_user.id}]", thumb="assets/thumb.jpg", ) await msg.delete() try: os.remove(path) except: pass except Exception as e: await message.reply( f"Failed to download Facebook video..\n\nReason: {e}" ) await msg.delete()