Tes new genss and mediainfo

This commit is contained in:
yasir 2023-02-15 14:25:52 +07:00
parent 4aa94f1ca6
commit 0fbb0003eb
2 changed files with 49 additions and 85 deletions

View file

@ -16,6 +16,7 @@ from pyrogram.errors import MessageNotModified
from requests_toolbelt import MultipartEncoder from requests_toolbelt import MultipartEncoder
from misskaty import app from misskaty import app
from misskaty.core.message_utils import *
from misskaty.helper import SUPPORTED_URL_REGEX, progress_for_pyrogram from misskaty.helper import SUPPORTED_URL_REGEX, progress_for_pyrogram
from misskaty.vars import COMMAND_HANDLER from misskaty.vars import COMMAND_HANDLER
@ -25,7 +26,7 @@ async def slowpics_collection(message, file_name, path):
Uploads image(s) to https://slow.pics/ from a specified directory. Uploads image(s) to https://slow.pics/ from a specified directory.
""" """
msg = await message.reply_text("uploading generated screenshots to slow.pics.", quote=True) msg = await kirimPesan(message, "uploading generated screenshots to slow.pics.", quote=True)
img_list = os.listdir(path) img_list = os.listdir(path)
data = { data = {
@ -78,7 +79,7 @@ async def generate_ss_from_file(
Generates screenshots from partially/fully downloaded files using ffmpeg. Generates screenshots from partially/fully downloaded files using ffmpeg.
""" """
await replymsg.edit(f"Generating **{frame_count}** screnshots from `{unquote(file_name)}`, please wait...") await editPesan(replymsg, f"Generating **{frame_count}** screnshots from `{unquote(file_name)}`, please wait...")
rand_str = os.randstr() rand_str = os.randstr()
os.makedir(f"screenshot_{rand_str}") os.makedir(f"screenshot_{rand_str}")
@ -103,7 +104,7 @@ async def generate_ss_from_file(
loop_count += 1 loop_count += 1
loop_count -= 1 loop_count -= 1
await replymsg.delete() await hapusPesan(replymsg)
await slowpics_collection(message, file_name, path=f"{os.getcwd()}/screenshot_{rand_str}") await slowpics_collection(message, file_name, path=f"{os.getcwd()}/screenshot_{rand_str}")
shutil.rmtree(f"screenshot_{rand_str}") shutil.rmtree(f"screenshot_{rand_str}")
@ -123,7 +124,7 @@ async def generate_ss_from_link(
Generates screenshots from direct download links using ffmpeg. Generates screenshots from direct download links using ffmpeg.
""" """
await replymsg.edit(f"Generating **{frame_count}** screnshots from `{unquote(file_name)}`, please wait...") await editPesan(replymsg, f"Generating **{frame_count}** screnshots from `{unquote(file_name)}`, please wait...")
rand_str = os.randstr() rand_str = os.randstr()
os.makedir(f"screenshot_{rand_str}") os.makedir(f"screenshot_{rand_str}")
@ -142,11 +143,10 @@ async def generate_ss_from_link(
_, __ = await shell.communicate() _, __ = await shell.communicate()
loop_count -= 1 loop_count -= 1
time.sleep(3) await asyncio.sleep(3)
await replymsg.delete() await hapusPesan(replymsg)
await slowpics_collection(message, file_name, path=f"{os.getcwd()}/screenshot_{rand_str}") await slowpics_collection(message, file_name, path=f"{os.getcwd()}/screenshot_{rand_str}")
shutil.rmtree(f"screenshot_{rand_str}") shutil.rmtree(f"screenshot_{rand_str}")
@ -250,7 +250,7 @@ async def telegram_screenshot(client, message, frame_count):
@app.on_message(filters.command("genss2", COMMAND_HANDLER)) @app.on_message(filters.command("genss2", COMMAND_HANDLER))
async def screenshot(client, message): async def genscreenshotv2(client, message):
replied_message = message.reply_to_message replied_message = message.reply_to_message
if replied_message: if replied_message:
try: try:
@ -265,7 +265,7 @@ async def screenshot(client, message):
if len(message.command) < 2: if len(message.command) < 2:
mediainfo_usage = "Generates video frame screenshot from Telegram files or direct download links." mediainfo_usage = "Generates video frame screenshot from Telegram files or direct download links."
return await message.reply_text(mediainfo_usage, quote=True) return await kirimPesan(message, mediainfo_usage, quote=True)
user_input = message.text.split(None, 1)[1] user_input = message.text.split(None, 1)[1]
if "|" in user_input: if "|" in user_input:
@ -288,4 +288,4 @@ async def screenshot(client, message):
if bool(re.search(Rf"{key}", url)): if bool(re.search(Rf"{key}", url)):
if value == "ddl": if value == "ddl":
return await ddl_screenshot(message, frame_count, url) return await ddl_screenshot(message, frame_count, url)
return await message.reply_text("This type of link is not supported.", quote=True) return await kirimPesan(message, "This type of link is not supported.", quote=True)

View file

@ -1,18 +1,21 @@
from importlib.resources import contents
import json import json
import os import os
import re import re
import time
import subprocess import subprocess
import time
from urllib.parse import unquote from urllib.parse import unquote
import requests import requests
from pyrogram import filters from pyrogram import filters
from misskaty import app from misskaty import app
from misskaty.helper import (SUPPORTED_URL_REGEX, get_readable_bitrate, from misskaty.core.message_utils import *
get_readable_file_size, post_to_telegraph, from misskaty.core.decorator.ratelimiter import ratelimiter
progress_for_pyrogram, remove_N) from misskaty.helper import (SUPPORTED_URL_REGEX, post_to_telegraph,
progress_for_pyrogram, runcmd)
from misskaty.vars import COMMAND_HANDLER from misskaty.vars import COMMAND_HANDLER
from utils import get_file_id
async def ddl_mediainfo(_, message, url): async def ddl_mediainfo(_, message, url):
@ -22,49 +25,28 @@ async def ddl_mediainfo(_, message, url):
try: try:
filename = re.search(".+/(.+)", url).group(1) filename = re.search(".+/(.+)", url).group(1)
reply_msg = await message.reply_text("Generating Mediainfo, Please wait..", quote=True) reply_msg = await kirimPesan(message, "Generating Mediainfo, Please wait..", quote=True)
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
with open(filename, 'wb') as f: with open(filename, 'wb') as f:
for chunk in r.iter_content(50000000): f.write(chunk); break for chunk in r.iter_content(50000000): f.write(chunk); break
mediainfo = subprocess.check_output(['mediainfo', filename]).decode("utf-8") output_ = await runcmd(f'mediainfo "{filename}"')
mediainfo_json = json.loads(subprocess.check_output(['mediainfo', filename, '--Output=JSON']).decode("utf-8")) out = output_[0] if len(output_) != 0 else None
content = f"""
filesize = requests.head(url).headers.get('content-length') MissKatyBot MediaInfo
lines = mediainfo.splitlines() DETAILS
for i in range(len(lines)): {out or 'Not Supported'}
if 'Complete name' in lines[i]: """
lines[i] = re.sub(r": .+", ': ' + unquote(filename), lines[i])
elif 'File size' in lines[i]:
lines[i] = re.sub(r": .+", ': ' + get_readable_file_size(float(filesize)), lines[i])
elif 'Overall bit rate' in lines[i] and 'Overall bit rate mode' not in lines[i]:
duration = float(mediainfo_json['media']['track'][0]['Duration'])
bitrate = get_readable_bitrate(float(filesize) * 8 / (duration * 1000))
lines[i] = re.sub(r": .+", ': ' + bitrate, lines[i])
elif 'IsTruncated' in lines[i] or 'FileExtension_Invalid' in lines[i]:
lines[i] = ''
with open(f'{filename}.txt', 'w') as f:
f.write('\n'.join(lines))
with open(f"{filename}.txt", "r+") as file:
content = file.read()
output = await post_to_telegraph(False, "MissKaty MediaInfo", content) output = await post_to_telegraph(False, "MissKaty MediaInfo", content)
await reply_msg.edit(f"**File Name :** `{unquote(filename)}`\n\n**Mediainfo :** {output}", await editPesan(reply_msg, f"**File Name :** `{unquote(filename)}`\n\n**Mediainfo :** {output}", disable_web_page_preview=True)
disable_web_page_preview=True)
os.remove(f"{filename}.txt")
os.remove(filename) os.remove(filename)
except: except:
await reply_msg.delete() await hapusPesan(reply_msg)
return await message.reply_text(f"Something went wrong while generating Mediainfo from the given url.", await kirimPesan(message, f"Something went wrong while generating Mediainfo from the given url.", quote=True)
quote=True)
async def telegram_mediainfo(client, message): async def telegram_mediainfo(client, message):
@ -75,7 +57,7 @@ async def telegram_mediainfo(client, message):
message = message.reply_to_message message = message.reply_to_message
if message.text: if message.text:
return await message.reply_text("Reply to a proper media file for generating Mediainfo.**", quote=True) return await kirimPesan(message, "Reply to a proper media file for generating Mediainfo.**", quote=True)
elif message.media.value == 'video': elif message.media.value == 'video':
media = message.video media = message.video
@ -90,13 +72,13 @@ async def telegram_mediainfo(client, message):
media = message.voice media = message.voice
else: else:
return await message.reply_text("This type of media is not supported for generating Mediainfo.**", quote=True) return await kirimPesan(message, "This type of media is not supported for generating Mediainfo.**", quote=True)
filename = str(media.file_name) filename = str(media.file_name)
mime = media.mime_type mime = media.mime_type
size = media.file_size size = media.file_size
reply_msg = await message.reply_text("Generating Mediainfo, Please wait..", quote=True) reply_msg = await kirimPesan(message, "Generating Mediainfo, Please wait..", quote=True)
if int(size) <= 50000000: if int(size) <= 50000000:
c_time = time.time() c_time = time.time()
@ -111,47 +93,29 @@ async def telegram_mediainfo(client, message):
with open(filename, 'ab') as f: with open(filename, 'ab') as f:
f.write(chunk) f.write(chunk)
mediainfo = subprocess.check_output(['mediainfo', filename]).decode("utf-8")
mediainfo_json = json.loads(subprocess.check_output(['mediainfo', filename, '--Output=JSON']).decode("utf-8"))
readable_size = get_readable_file_size(size)
try: try:
lines = mediainfo.splitlines() output_ = await runcmd(f'mediainfo "{filename}"')
for i in range(len(lines)): out = output_[0] if len(output_) != 0 else None
if 'File size' in lines[i]: file_info = get_file_id(message.reply_to_message)
lines[i] = re.sub(r": .+", ': ' + readable_size, lines[i]) content = f"""
MissKatyBot MediaInfo
elif 'Overall bit rate' in lines[i] and 'Overall bit rate mode' not in lines[i]: JSON
{file_info}.type
duration = float(mediainfo_json['media']['track'][0]['Duration'])
bitrate_kbps = (size * 8) / (duration * 1000) DETAILS
bitrate = get_readable_bitrate(bitrate_kbps) {out or 'Not Supported'}
"""
lines[i] = re.sub(r": .+", ': ' + bitrate, lines[i])
elif 'IsTruncated' in lines[i] or 'FileExtension_Invalid' in lines[i]:
lines[i] = ''
remove_N(lines)
with open(f'{filename}.txt', 'w') as f:
f.write('\n'.join(lines))
with open(f"{filename}.txt", "r+") as file:
content = file.read()
output = await post_to_telegraph(False, "MissKaty MediaInfo", content) output = await post_to_telegraph(False, "MissKaty MediaInfo", content)
await reply_msg.edit(f"**File Name :** `{filename}`\n\n**Mediainfo :** {output}", disable_web_page_preview=True) await editPesan(reply_msg, f"**File Name :** `{filename}`\n\n**Mediainfo :** {output}", disable_web_page_preview=True)
os.remove(f'{filename}.txt')
os.remove(filename) os.remove(filename)
except: except:
await reply_msg.delete() await hapusPesan(reply_msg)
await message.reply_text(f"Something went wrong while generating Mediainfo of replied Telegram file.", quote=True) await kirimPesan(message, f"Something went wrong while generating Mediainfo of replied Telegram file.", quote=True)
@ratelimiter
@app.on_message(filters.command("mediainfo2", COMMAND_HANDLER)) @app.on_message(filters.command("mediainfo2", COMMAND_HANDLER))
async def mediainfo(client, message): async def mediainfo(client, message):
mediainfo_usage = f"**Generate mediainfo from Telegram files or direct download links. Reply to any telegram file or just pass the link after the command." mediainfo_usage = f"**Generate mediainfo from Telegram files or direct download links. Reply to any telegram file or just pass the link after the command."
@ -160,11 +124,11 @@ async def mediainfo(client, message):
return await telegram_mediainfo(client, message) return await telegram_mediainfo(client, message)
elif len(message.command) < 2: elif len(message.command) < 2:
return await message.reply_text(mediainfo_usage, quote=True) return await kirimPesan(message, mediainfo_usage, quote=True)
user_url = message.text.split(None, 1)[1].split(" ")[0] user_url = message.text.split(None, 1)[1].split(" ")[0]
for (key, value) in SUPPORTED_URL_REGEX.items(): for (key, value) in SUPPORTED_URL_REGEX.items():
if bool(re.search(FR"{key}", user_url)): if bool(re.search(FR"{key}", user_url)):
if value == "ddl": if value == "ddl":
return await ddl_mediainfo(client, message, url=user_url) return await ddl_mediainfo(client, message, url=user_url)
await message.reply_text("This type of URL is not supported.", quote=True) await kirimPesan(message, "This type of URL is not supported.", quote=True)