From 298a076362f581c9046703cdc5c6645396d70e89 Mon Sep 17 00:00:00 2001 From: Yasir Aris M Date: Wed, 18 Oct 2023 11:26:06 +0700 Subject: [PATCH] Fix httpx exception and use contextlib to handle some exception Signed-off-by: Yasir Aris M --- misskaty/plugins/imdb_search.py | 932 ++++++++++++++++---------------- misskaty/plugins/misc_tools.py | 23 +- misskaty/plugins/web_scraper.py | 422 ++++++++------- 3 files changed, 701 insertions(+), 676 deletions(-) diff --git a/misskaty/plugins/imdb_search.py b/misskaty/plugins/imdb_search.py index b59c1576..593fd8dc 100644 --- a/misskaty/plugins/imdb_search.py +++ b/misskaty/plugins/imdb_search.py @@ -2,10 +2,12 @@ # * @date 2023-06-21 22:12:27 # * @projectName MissKatyPyro # * Copyright Β©YasirPedia All rights reserved +import contextlib import httpx import json import logging import re +import sys from urllib.parse import quote_plus from bs4 import BeautifulSoup @@ -135,56 +137,57 @@ async def imdb_search_id(kueri, message): ) msg = "" buttons = InlineKeyboard(row_width=4) - try: - r = await fetch.get( - f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" - ) - r.raise_for_status() - res = r.json().get("d") - if not res: - return await k.edit_caption( - f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}" + with contextlib.redirect_stdout(sys.stderr): + try: + r = await fetch.get( + f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" ) - msg += f"🎬 Ditemukan ({len(res)}) hasil untuk kueri: {kueri}\n\n" - for num, movie in enumerate(res, start=1): - title = movie.get("l") - if year := movie.get("yr"): - year = f"({year})" - elif year := movie.get("y"): - year = f"({year})" - else: - year = "(N/A)" - typee = movie.get("q", "N/A").replace("feature", "movie").title() - movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] - msg += f"{num}. {title} {year} - {typee}\n" - BTN.append( - InlineKeyboardButton( - text=num, - callback_data=f"imdbres_id#{message.from_user.id}#{movieID}", + r.raise_for_status() + res = r.json().get("d") + if not res: + return await k.edit_caption( + f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}" + ) + msg += f"🎬 Ditemukan ({len(res)}) hasil untuk kueri: {kueri}\n\n" + for num, movie in enumerate(res, start=1): + title = movie.get("l") + if year := movie.get("yr"): + year = f"({year})" + elif year := movie.get("y"): + year = f"({year})" + else: + year = "(N/A)" + typee = movie.get("q", "N/A").replace("feature", "movie").title() + movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] + msg += f"{num}. {title} {year} - {typee}\n" + BTN.append( + InlineKeyboardButton( + text=num, + callback_data=f"imdbres_id#{message.from_user.id}#{movieID}", + ) + ) + BTN.extend( + ( + InlineKeyboardButton( + text="🚩 Language", + callback_data=f"imdbset#{message.from_user.id}", + ), + InlineKeyboardButton( + text="❌ Close", + callback_data=f"close#{message.from_user.id}", + ), ) ) - BTN.extend( - ( - InlineKeyboardButton( - text="🚩 Language", - callback_data=f"imdbset#{message.from_user.id}", - ), - InlineKeyboardButton( - text="❌ Close", - callback_data=f"close#{message.from_user.id}", - ), + buttons.add(*BTN) + await k.edit_caption(msg, reply_markup=buttons) + except httpx.HTTPError as exc: + await k.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except (MessageIdInvalid, MessageNotModified): + pass + except Exception as err: + await k.edit_caption( + f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\nERROR: {err}" ) - ) - buttons.add(*BTN) - await k.edit_caption(msg, reply_markup=buttons) - except httpx.HTTPError as exc: - await k.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except (MessageIdInvalid, MessageNotModified): - pass - except Exception as err: - await k.edit_caption( - f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\nERROR: {err}" - ) async def imdb_search_en(kueri, message): @@ -196,56 +199,57 @@ async def imdb_search_en(kueri, message): ) msg = "" buttons = InlineKeyboard(row_width=4) - try: - r = await fetch.get( - f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" - ) - r.raise_for_status() - res = r.json().get("d") - if not res: - return await k.edit_caption( - f"⛔️ Result not found for keywords: {kueri}" + with contextlib.redirect_stdout(sys.stderr): + try: + r = await fetch.get( + f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" ) - msg += f"🎬 Found ({len(res)}) result for keywords: {kueri}\n\n" - for num, movie in enumerate(res, start=1): - title = movie.get("l") - if year := movie.get("yr"): - year = f"({year})" - elif year := movie.get("y"): - year = f"({year})" - else: - year = "(N/A)" - typee = movie.get("q", "N/A").replace("feature", "movie").title() - movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] - msg += f"{num}. {title} {year} - {typee}\n" - BTN.append( - InlineKeyboardButton( - text=num, - callback_data=f"imdbres_en#{message.from_user.id}#{movieID}", + r.raise_for_status() + res = r.json().get("d") + if not res: + return await k.edit_caption( + f"⛔️ Result not found for keywords: {kueri}" + ) + msg += f"🎬 Found ({len(res)}) result for keywords: {kueri}\n\n" + for num, movie in enumerate(res, start=1): + title = movie.get("l") + if year := movie.get("yr"): + year = f"({year})" + elif year := movie.get("y"): + year = f"({year})" + else: + year = "(N/A)" + typee = movie.get("q", "N/A").replace("feature", "movie").title() + movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] + msg += f"{num}. {title} {year} - {typee}\n" + BTN.append( + InlineKeyboardButton( + text=num, + callback_data=f"imdbres_en#{message.from_user.id}#{movieID}", + ) + ) + BTN.extend( + ( + InlineKeyboardButton( + text="🚩 Language", + callback_data=f"imdbset#{message.from_user.id}", + ), + InlineKeyboardButton( + text="❌ Close", + callback_data=f"close#{message.from_user.id}", + ), ) ) - BTN.extend( - ( - InlineKeyboardButton( - text="🚩 Language", - callback_data=f"imdbset#{message.from_user.id}", - ), - InlineKeyboardButton( - text="❌ Close", - callback_data=f"close#{message.from_user.id}", - ), + buttons.add(*BTN) + await k.edit_caption(msg, reply_markup=buttons) + except httpx.HTTPError as exc: + await k.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except (MessageIdInvalid, MessageNotModified): + pass + except Exception as err: + await k.edit_caption( + f"Failed when requesting movies title. Maybe got rate limit or down.\n\nERROR: {err}" ) - ) - buttons.add(*BTN) - await k.edit_caption(msg, reply_markup=buttons) - except httpx.HTTPError as exc: - await k.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except (MessageIdInvalid, MessageNotModified): - pass - except Exception as err: - await k.edit_caption( - f"Failed when requesting movies title. Maybe got rate limit or down.\n\nERROR: {err}" - ) @app.on_cb("imdbcari") @@ -268,51 +272,52 @@ async def imdbcari(_, query: CallbackQuery): pass msg = "" buttons = InlineKeyboard(row_width=4) - try: - r = await fetch.get( - f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" - ) - r.raise_for_status() - res = r.json().get("d") - if not res: - return await query.message.edit_caption( - f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}" + with contextlib.redirect_stdout(sys.stderr): + try: + r = await fetch.get( + f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" ) - msg += f"🎬 Ditemukan ({len(res)}) hasil dari: {kueri} ~ {query.from_user.mention}\n\n" - for num, movie in enumerate(res, start=1): - title = movie.get("l") - if year := movie.get("yr"): - year = f"({year})" - elif year := movie.get("y"): - year = f"({year})" - else: - year = "(N/A)" - typee = movie.get("q", "N/A").replace("feature", "movie").title() - movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] - msg += f"{num}. {title} {year} - {typee}\n" - BTN.append( - InlineKeyboardButton( - text=num, callback_data=f"imdbres_id#{uid}#{movieID}" + r.raise_for_status() + res = r.json().get("d") + if not res: + return await query.message.edit_caption( + f"⛔️ Tidak ditemukan hasil untuk kueri: {kueri}" + ) + msg += f"🎬 Ditemukan ({len(res)}) hasil dari: {kueri} ~ {query.from_user.mention}\n\n" + for num, movie in enumerate(res, start=1): + title = movie.get("l") + if year := movie.get("yr"): + year = f"({year})" + elif year := movie.get("y"): + year = f"({year})" + else: + year = "(N/A)" + typee = movie.get("q", "N/A").replace("feature", "movie").title() + movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] + msg += f"{num}. {title} {year} - {typee}\n" + BTN.append( + InlineKeyboardButton( + text=num, callback_data=f"imdbres_id#{uid}#{movieID}" + ) + ) + BTN.extend( + ( + InlineKeyboardButton( + text="🚩 Language", callback_data=f"imdbset#{uid}" + ), + InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), ) ) - BTN.extend( - ( - InlineKeyboardButton( - text="🚩 Language", callback_data=f"imdbset#{uid}" - ), - InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), + buttons.add(*BTN) + await query.message.edit_caption(msg, reply_markup=buttons) + except httpx.HTTPError as exc: + await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except (MessageIdInvalid, MessageNotModified): + pass + except Exception as err: + await query.message.edit_caption( + f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\nERROR: {err}" ) - ) - buttons.add(*BTN) - await query.message.edit_caption(msg, reply_markup=buttons) - except httpx.HTTPError as exc: - await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except (MessageIdInvalid, MessageNotModified): - pass - except Exception as err: - await query.message.edit_caption( - f"Ooppss, gagal mendapatkan daftar judul di IMDb. Mungkin terkena rate limit atau down.\n\nERROR: {err}" - ) else: if query.from_user.id != int(uid): return await query.answer("⚠️ Access Denied!", True) @@ -324,51 +329,52 @@ async def imdbcari(_, query: CallbackQuery): await query.message.edit_caption("πŸ”Ž Looking in the IMDB Database..") msg = "" buttons = InlineKeyboard(row_width=4) - try: - r = await fetch.get( - f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" - ) - r.raise_for_status() - res = r.json().get("d") - if not res: - return await query.message.edit_caption( - f"⛔️ Result not found for keywords: {kueri}" + with contextlib.redirect_stdout(sys.stderr): + try: + r = await fetch.get( + f"https://v3.sg.media-imdb.com/suggestion/titles/x/{quote_plus(kueri)}.json" ) - msg += f"🎬 Found ({len(res)}) result for keywords: {kueri} ~ {query.from_user.mention}\n\n" - for num, movie in enumerate(res, start=1): - title = movie.get("l") - if year := movie.get("yr"): - year = f"({year})" - elif year := movie.get("y"): - year = f"({year})" - else: - year = "(N/A)" - typee = movie.get("q", "N/A").replace("feature", "movie").title() - movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] - msg += f"{num}. {title} {year} - {typee}\n" - BTN.append( - InlineKeyboardButton( - text=num, callback_data=f"imdbres_en#{uid}#{movieID}" + r.raise_for_status() + res = r.json().get("d") + if not res: + return await query.message.edit_caption( + f"⛔️ Result not found for keywords: {kueri}" + ) + msg += f"🎬 Found ({len(res)}) result for keywords: {kueri} ~ {query.from_user.mention}\n\n" + for num, movie in enumerate(res, start=1): + title = movie.get("l") + if year := movie.get("yr"): + year = f"({year})" + elif year := movie.get("y"): + year = f"({year})" + else: + year = "(N/A)" + typee = movie.get("q", "N/A").replace("feature", "movie").title() + movieID = re.findall(r"tt(\d+)", movie.get("id"))[0] + msg += f"{num}. {title} {year} - {typee}\n" + BTN.append( + InlineKeyboardButton( + text=num, callback_data=f"imdbres_en#{uid}#{movieID}" + ) + ) + BTN.extend( + ( + InlineKeyboardButton( + text="🚩 Language", callback_data=f"imdbset#{uid}" + ), + InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), ) ) - BTN.extend( - ( - InlineKeyboardButton( - text="🚩 Language", callback_data=f"imdbset#{uid}" - ), - InlineKeyboardButton(text="❌ Close", callback_data=f"close#{uid}"), + buttons.add(*BTN) + await query.message.edit_caption(msg, reply_markup=buttons) + except httpx.HTTPError as exc: + await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except (MessageIdInvalid, MessageNotModified): + pass + except Exception as err: + await query.message.edit_caption( + f"Failed when requesting movies title. Maybe got rate limit or down.\n\nERROR: {err}" ) - ) - buttons.add(*BTN) - await query.message.edit_caption(msg, reply_markup=buttons) - except httpx.HTTPError as exc: - await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except (MessageIdInvalid, MessageNotModified): - pass - except Exception as err: - await query.message.edit_caption( - f"Failed when requesting movies title. Maybe got rate limit or down.\n\nERROR: {err}" - ) @app.on_cb("imdbres_id") @@ -376,160 +382,161 @@ async def imdb_id_callback(self: Client, query: CallbackQuery): i, userid, movie = query.data.split("#") if query.from_user.id != int(userid): return await query.answer("⚠️ Akses Ditolak!", True) - try: - await query.message.edit_caption("⏳ Permintaan kamu sedang diproses.. ") - imdb_url = f"https://www.imdb.com/title/tt{movie}/" - resp = await fetch.get(imdb_url) - resp.raise_for_status() - sop = BeautifulSoup(resp, "lxml") - r_json = json.loads( - sop.find("script", attrs={"type": "application/ld+json"}).contents[0] - ) - ott = await search_jw(r_json.get("name"), "ID") - typee = r_json.get("@type", "") - res_str = "" - tahun = ( - re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] - if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) - else "N/A" - ) - res_str += f"πŸ“Ή Judul: {r_json.get('name')} [{tahun}] ({typee})\n" - if aka := r_json.get("alternateName"): - res_str += f"πŸ“’ AKA: {aka}\n\n" - else: - res_str += "\n" - if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): - durasi = ( - durasi[0].find(class_="ipc-metadata-list-item__content-container").text + with contextlib.redirect_stdout(sys.stderr): + try: + await query.message.edit_caption("⏳ Permintaan kamu sedang diproses.. ") + imdb_url = f"https://www.imdb.com/title/tt{movie}/" + resp = await fetch.get(imdb_url) + resp.raise_for_status() + sop = BeautifulSoup(resp, "lxml") + r_json = json.loads( + sop.find("script", attrs={"type": "application/ld+json"}).contents[0] ) - res_str += f"Durasi: {GoogleTranslator('auto', 'id').translate(durasi)}\n" - if kategori := r_json.get("contentRating"): - res_str += f"Kategori: {kategori} \n" - if rating := r_json.get("aggregateRating"): - res_str += f"Peringkat: {rating['ratingValue']}⭐️ dari {rating['ratingCount']} pengguna\n" - if release := sop.select('li[data-testid="title-details-releasedate"]'): - rilis = ( - release[0] - .find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ott = await search_jw(r_json.get("name"), "ID") + typee = r_json.get("@type", "") + res_str = "" + tahun = ( + re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] + if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) + else "N/A" + ) + res_str += f"πŸ“Ή Judul: {r_json.get('name')} [{tahun}] ({typee})\n" + if aka := r_json.get("alternateName"): + res_str += f"πŸ“’ AKA: {aka}\n\n" + else: + res_str += "\n" + if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): + durasi = ( + durasi[0].find(class_="ipc-metadata-list-item__content-container").text ) - .text - ) - rilis_url = release[0].find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - )["href"] - res_str += ( - f"Rilis: {rilis}\n" - ) - if genre := r_json.get("genre"): - genre = "".join( - f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " - if i in GENRES_EMOJI - else f"#{i.replace('-', '_').replace(' ', '_')}, " - for i in r_json["genre"] - ) - res_str += f"Genre: {genre[:-2]}\n" - if negara := sop.select('li[data-testid="title-details-origin"]'): - country = "".join( - f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " - for country in negara[0].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + res_str += f"Durasi: {GoogleTranslator('auto', 'id').translate(durasi)}\n" + if kategori := r_json.get("contentRating"): + res_str += f"Kategori: {kategori} \n" + if rating := r_json.get("aggregateRating"): + res_str += f"Peringkat: {rating['ratingValue']}⭐️ dari {rating['ratingCount']} pengguna\n" + if release := sop.select('li[data-testid="title-details-releasedate"]'): + rilis = ( + release[0] + .find( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + .text ) - ) - res_str += f"Negara: {country[:-2]}\n" - if bahasa := sop.select('li[data-testid="title-details-languages"]'): - language = "".join( - f"#{lang.text.replace(' ', '_').replace('-', '_')}, " - for lang in bahasa[0].findAll( + rilis_url = release[0].find( class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + )["href"] + res_str += ( + f"Rilis: {rilis}\n" ) - ) - res_str += f"Bahasa: {language[:-2]}\n" - res_str += "\nπŸ™Ž Info Cast:\n" - if directors := r_json.get("director"): - director = "".join( - f"{i['name']}, " for i in directors - ) - res_str += f"Sutradara: {director[:-2]}\n" - if creators := r_json.get("creator"): - creator = "".join( - f"{i['name']}, " - for i in creators - if i["@type"] == "Person" - ) - res_str += f"Penulis: {creator[:-2]}\n" - if actors := r_json.get("actor"): - actor = "".join(f"{i['name']}, " for i in actors) - res_str += f"Pemeran: {actor[:-2]}\n\n" - if deskripsi := r_json.get("description"): - summary = GoogleTranslator("auto", "id").translate(deskripsi) - res_str += f"πŸ“œ Plot: {summary}\n\n" - if keywd := r_json.get("keywords"): - key_ = "".join( - f"#{i.replace(' ', '_').replace('-', '_')}, " for i in keywd.split(",") - ) - res_str += f"πŸ”₯ Kata Kunci: {key_[:-2]} \n" - if award := sop.select('li[data-testid="award_information"]'): - awards = ( - award[0].find(class_="ipc-metadata-list-item__list-content-item").text - ) - res_str += f"πŸ† Penghargaan: {GoogleTranslator('auto', 'id').translate(awards)}\n" - else: - res_str += "\n" - if ott != "": - res_str += f"Tersedia di:\n{ott}\n" - res_str += f"©️ IMDb by @{self.me.username}" - if trailer := r_json.get("trailer"): - trailer_url = trailer["url"] - markup = InlineKeyboardMarkup( - [ + if genre := r_json.get("genre"): + genre = "".join( + f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " + if i in GENRES_EMOJI + else f"#{i.replace('-', '_').replace(' ', '_')}, " + for i in r_json["genre"] + ) + res_str += f"Genre: {genre[:-2]}\n" + if negara := sop.select('li[data-testid="title-details-origin"]'): + country = "".join( + f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " + for country in negara[0].findAll( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + ) + res_str += f"Negara: {country[:-2]}\n" + if bahasa := sop.select('li[data-testid="title-details-languages"]'): + language = "".join( + f"#{lang.text.replace(' ', '_').replace('-', '_')}, " + for lang in bahasa[0].findAll( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + ) + res_str += f"Bahasa: {language[:-2]}\n" + res_str += "\nπŸ™Ž Info Cast:\n" + if directors := r_json.get("director"): + director = "".join( + f"{i['name']}, " for i in directors + ) + res_str += f"Sutradara: {director[:-2]}\n" + if creators := r_json.get("creator"): + creator = "".join( + f"{i['name']}, " + for i in creators + if i["@type"] == "Person" + ) + res_str += f"Penulis: {creator[:-2]}\n" + if actors := r_json.get("actor"): + actor = "".join(f"{i['name']}, " for i in actors) + res_str += f"Pemeran: {actor[:-2]}\n\n" + if deskripsi := r_json.get("description"): + summary = GoogleTranslator("auto", "id").translate(deskripsi) + res_str += f"πŸ“œ Plot: {summary}\n\n" + if keywd := r_json.get("keywords"): + key_ = "".join( + f"#{i.replace(' ', '_').replace('-', '_')}, " for i in keywd.split(",") + ) + res_str += f"πŸ”₯ Kata Kunci: {key_[:-2]} \n" + if award := sop.select('li[data-testid="award_information"]'): + awards = ( + award[0].find(class_="ipc-metadata-list-item__list-content-item").text + ) + res_str += f"πŸ† Penghargaan: {GoogleTranslator('auto', 'id').translate(awards)}\n" + else: + res_str += "\n" + if ott != "": + res_str += f"Tersedia di:\n{ott}\n" + res_str += f"©️ IMDb by @{self.me.username}" + if trailer := r_json.get("trailer"): + trailer_url = trailer["url"] + markup = InlineKeyboardMarkup( [ - InlineKeyboardButton("🎬 Open IMDB", url=imdb_url), - InlineKeyboardButton("▢️ Trailer", url=trailer_url), + [ + InlineKeyboardButton("🎬 Open IMDB", url=imdb_url), + InlineKeyboardButton("▢️ Trailer", url=trailer_url), + ] ] - ] - ) - else: - markup = InlineKeyboardMarkup( - [[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]] - ) - if thumb := r_json.get("image"): - try: - await query.message.edit_media( - InputMediaPhoto( - thumb, caption=res_str, parse_mode=enums.ParseMode.HTML - ), - reply_markup=markup, ) - except (PhotoInvalidDimensions, WebpageMediaEmpty): - poster = thumb.replace(".jpg", "._V1_UX360.jpg") - await query.message.edit_media( - InputMediaPhoto( - poster, caption=res_str, parse_mode=enums.ParseMode.HTML - ), - reply_markup=markup, + else: + markup = InlineKeyboardMarkup( + [[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]] ) - except ( - MediaEmpty, - MediaCaptionTooLong, - WebpageCurlFailed, - MessageNotModified, - ): - await query.message.reply( + if thumb := r_json.get("image"): + try: + await query.message.edit_media( + InputMediaPhoto( + thumb, caption=res_str, parse_mode=enums.ParseMode.HTML + ), + reply_markup=markup, + ) + except (PhotoInvalidDimensions, WebpageMediaEmpty): + poster = thumb.replace(".jpg", "._V1_UX360.jpg") + await query.message.edit_media( + InputMediaPhoto( + poster, caption=res_str, parse_mode=enums.ParseMode.HTML + ), + reply_markup=markup, + ) + except ( + MediaEmpty, + MediaCaptionTooLong, + WebpageCurlFailed, + MessageNotModified, + ): + await query.message.reply( + res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup + ) + except Exception as err: + LOGGER.error(f"Terjadi error saat menampilkan data IMDB. ERROR: {err}") + else: + await query.message.edit_caption( res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup ) - except Exception as err: - LOGGER.error(f"Terjadi error saat menampilkan data IMDB. ERROR: {err}") - else: - await query.message.edit_caption( - res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup - ) - except httpx.HTTPError as exc: - await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except AttributeError: - await query.message.edit_caption("Maaf, gagal mendapatkan info data dari IMDB.") - except (MessageNotModified, MessageIdInvalid): - pass + except httpx.HTTPError as exc: + await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except AttributeError: + await query.message.edit_caption("Maaf, gagal mendapatkan info data dari IMDB.") + except (MessageNotModified, MessageIdInvalid): + pass @app.on_cb("imdbres_en") @@ -537,159 +544,160 @@ async def imdb_en_callback(self: Client, query: CallbackQuery): i, userid, movie = query.data.split("#") if query.from_user.id != int(userid): return await query.answer("⚠️ Access Denied!", True) - try: - await query.message.edit_caption("⏳ Getting IMDb source..") - imdb_url = f"https://www.imdb.com/title/tt{movie}/" - resp = await fetch.get(imdb_url) - resp.raise_for_status() - sop = BeautifulSoup(resp, "lxml") - r_json = json.loads( - sop.find("script", attrs={"type": "application/ld+json"}).contents[0] - ) - ott = await search_jw(r_json.get("name"), "US") - typee = r_json.get("@type", "") - res_str = "" - tahun = ( - re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] - if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) - else "N/A" - ) - res_str += f"πŸ“Ή Judul: {r_json.get('name')} [{tahun}] ({typee})\n" - if aka := r_json.get("alternateName"): - res_str += f"πŸ“’ AKA: {aka}\n\n" - else: - res_str += "\n" - if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): - durasi = ( - durasi[0].find(class_="ipc-metadata-list-item__content-container").text + with contextlib.redirect_stdout(sys.stderr): + try: + await query.message.edit_caption("⏳ Getting IMDb source..") + imdb_url = f"https://www.imdb.com/title/tt{movie}/" + resp = await fetch.get(imdb_url) + resp.raise_for_status() + sop = BeautifulSoup(resp, "lxml") + r_json = json.loads( + sop.find("script", attrs={"type": "application/ld+json"}).contents[0] ) - res_str += f"Duration: {durasi}\n" - if kategori := r_json.get("contentRating"): - res_str += f"Category: {kategori} \n" - if rating := r_json.get("aggregateRating"): - res_str += f"Rating: {rating['ratingValue']}⭐️ from {rating['ratingCount']} users\n" - if release := sop.select('li[data-testid="title-details-releasedate"]'): - rilis = ( - release[0] - .find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ott = await search_jw(r_json.get("name"), "US") + typee = r_json.get("@type", "") + res_str = "" + tahun = ( + re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text)[0] + if re.findall(r"\d{4}\W\d{4}|\d{4}-?", sop.title.text) + else "N/A" + ) + res_str += f"πŸ“Ή Judul: {r_json.get('name')} [{tahun}] ({typee})\n" + if aka := r_json.get("alternateName"): + res_str += f"πŸ“’ AKA: {aka}\n\n" + else: + res_str += "\n" + if durasi := sop.select('li[data-testid="title-techspec_runtime"]'): + durasi = ( + durasi[0].find(class_="ipc-metadata-list-item__content-container").text ) - .text - ) - rilis_url = release[0].find( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" - )["href"] - res_str += ( - f"Rilis: {rilis}\n" - ) - if genre := r_json.get("genre"): - genre = "".join( - f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " - if i in GENRES_EMOJI - else f"#{i.replace('-', '_').replace(' ', '_')}, " - for i in r_json["genre"] - ) - res_str += f"Genre: {genre[:-2]}\n" - if negara := sop.select('li[data-testid="title-details-origin"]'): - country = "".join( - f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " - for country in negara[0].findAll( - class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + res_str += f"Duration: {durasi}\n" + if kategori := r_json.get("contentRating"): + res_str += f"Category: {kategori} \n" + if rating := r_json.get("aggregateRating"): + res_str += f"Rating: {rating['ratingValue']}⭐️ from {rating['ratingCount']} users\n" + if release := sop.select('li[data-testid="title-details-releasedate"]'): + rilis = ( + release[0] + .find( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + .text ) - ) - res_str += f"Country: {country[:-2]}\n" - if bahasa := sop.select('li[data-testid="title-details-languages"]'): - language = "".join( - f"#{lang.text.replace(' ', '_').replace('-', '_')}, " - for lang in bahasa[0].findAll( + rilis_url = release[0].find( class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + )["href"] + res_str += ( + f"Rilis: {rilis}\n" ) - ) - res_str += f"Language: {language[:-2]}\n" - res_str += "\nπŸ™Ž Cast Info:\n" - if r_json.get("director"): - director = "".join( - f"{i['name']}, " for i in r_json["director"] - ) - res_str += f"Director: {director[:-2]}\n" - if r_json.get("creator"): - creator = "".join( - f"{i['name']}, " - for i in r_json["creator"] - if i["@type"] == "Person" - ) - res_str += f"Writer: {creator[:-2]}\n" - if r_json.get("actor"): - actors = actors = "".join( - f"{i['name']}, " for i in r_json["actor"] - ) - res_str += f"Stars: {actors[:-2]}\n\n" - if description := r_json.get("description"): - res_str += f"πŸ“œ Summary: {description}\n\n" - if r_json.get("keywords"): - key_ = "".join( - f"#{i.replace(' ', '_').replace('-', '_')}, " - for i in r_json["keywords"].split(",") - ) - res_str += f"πŸ”₯ Keywords: {key_[:-2]} \n" - if award := sop.select('li[data-testid="award_information"]'): - awards = ( - award[0].find(class_="ipc-metadata-list-item__list-content-item").text - ) - res_str += f"πŸ† Awards: {awards}\n" - else: - res_str += "\n" - if ott != "": - res_str += f"Available On:\n{ott}\n" - res_str += f"©️ IMDb by @{self.me.username}" - if trailer := r_json.get("trailer"): - trailer_url = trailer["url"] - markup = InlineKeyboardMarkup( - [ + if genre := r_json.get("genre"): + genre = "".join( + f"{GENRES_EMOJI[i]} #{i.replace('-', '_').replace(' ', '_')}, " + if i in GENRES_EMOJI + else f"#{i.replace('-', '_').replace(' ', '_')}, " + for i in r_json["genre"] + ) + res_str += f"Genre: {genre[:-2]}\n" + if negara := sop.select('li[data-testid="title-details-origin"]'): + country = "".join( + f"{demoji(country.text)} #{country.text.replace(' ', '_').replace('-', '_')}, " + for country in negara[0].findAll( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + ) + res_str += f"Country: {country[:-2]}\n" + if bahasa := sop.select('li[data-testid="title-details-languages"]'): + language = "".join( + f"#{lang.text.replace(' ', '_').replace('-', '_')}, " + for lang in bahasa[0].findAll( + class_="ipc-metadata-list-item__list-content-item ipc-metadata-list-item__list-content-item--link" + ) + ) + res_str += f"Language: {language[:-2]}\n" + res_str += "\nπŸ™Ž Cast Info:\n" + if r_json.get("director"): + director = "".join( + f"{i['name']}, " for i in r_json["director"] + ) + res_str += f"Director: {director[:-2]}\n" + if r_json.get("creator"): + creator = "".join( + f"{i['name']}, " + for i in r_json["creator"] + if i["@type"] == "Person" + ) + res_str += f"Writer: {creator[:-2]}\n" + if r_json.get("actor"): + actors = actors = "".join( + f"{i['name']}, " for i in r_json["actor"] + ) + res_str += f"Stars: {actors[:-2]}\n\n" + if description := r_json.get("description"): + res_str += f"πŸ“œ Summary: {description}\n\n" + if r_json.get("keywords"): + key_ = "".join( + f"#{i.replace(' ', '_').replace('-', '_')}, " + for i in r_json["keywords"].split(",") + ) + res_str += f"πŸ”₯ Keywords: {key_[:-2]} \n" + if award := sop.select('li[data-testid="award_information"]'): + awards = ( + award[0].find(class_="ipc-metadata-list-item__list-content-item").text + ) + res_str += f"πŸ† Awards: {awards}\n" + else: + res_str += "\n" + if ott != "": + res_str += f"Available On:\n{ott}\n" + res_str += f"©️ IMDb by @{self.me.username}" + if trailer := r_json.get("trailer"): + trailer_url = trailer["url"] + markup = InlineKeyboardMarkup( [ - InlineKeyboardButton("🎬 Open IMDB", url=imdb_url), - InlineKeyboardButton("▢️ Trailer", url=trailer_url), + [ + InlineKeyboardButton("🎬 Open IMDB", url=imdb_url), + InlineKeyboardButton("▢️ Trailer", url=trailer_url), + ] ] - ] - ) - else: - markup = InlineKeyboardMarkup( - [[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]] - ) - if thumb := r_json.get("image"): - try: - await query.message.edit_media( - InputMediaPhoto( - thumb, caption=res_str, parse_mode=enums.ParseMode.HTML - ), - reply_markup=markup, ) - except (PhotoInvalidDimensions, WebpageMediaEmpty): - poster = thumb.replace(".jpg", "._V1_UX360.jpg") - await query.message.edit_media( - InputMediaPhoto( - poster, caption=res_str, parse_mode=enums.ParseMode.HTML - ), - reply_markup=markup, + else: + markup = InlineKeyboardMarkup( + [[InlineKeyboardButton("🎬 Open IMDB", url=imdb_url)]] ) - except ( - MediaCaptionTooLong, - WebpageCurlFailed, - MediaEmpty, - MessageNotModified, - ): - await query.message.reply( + if thumb := r_json.get("image"): + try: + await query.message.edit_media( + InputMediaPhoto( + thumb, caption=res_str, parse_mode=enums.ParseMode.HTML + ), + reply_markup=markup, + ) + except (PhotoInvalidDimensions, WebpageMediaEmpty): + poster = thumb.replace(".jpg", "._V1_UX360.jpg") + await query.message.edit_media( + InputMediaPhoto( + poster, caption=res_str, parse_mode=enums.ParseMode.HTML + ), + reply_markup=markup, + ) + except ( + MediaCaptionTooLong, + WebpageCurlFailed, + MediaEmpty, + MessageNotModified, + ): + await query.message.reply( + res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup + ) + except Exception as err: + LOGGER.error(f"Error while displaying IMDB Data. ERROR: {err}") + else: + await query.message.edit_caption( res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup ) - except Exception as err: - LOGGER.error(f"Error while displaying IMDB Data. ERROR: {err}") - else: - await query.message.edit_caption( - res_str, parse_mode=enums.ParseMode.HTML, reply_markup=markup - ) - except httpx.HTTPError as exc: - await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) - except AttributeError: - await query.message.edit_caption("Sorry, failed getting data from IMDB.") - except (MessageNotModified, MessageIdInvalid): - pass + except httpx.HTTPError as exc: + await query.message.edit_caption(f"HTTP Exception for IMDB Search - {exc}", disable_web_page_preview=True) + except AttributeError: + await query.message.edit_caption("Sorry, failed getting data from IMDB.") + except (MessageNotModified, MessageIdInvalid): + pass diff --git a/misskaty/plugins/misc_tools.py b/misskaty/plugins/misc_tools.py index 2187142a..f810a902 100644 --- a/misskaty/plugins/misc_tools.py +++ b/misskaty/plugins/misc_tools.py @@ -6,10 +6,12 @@ """ import asyncio +import contextlib import html import json import os import re +import sys import traceback from logging import getLogger from urllib.parse import quote @@ -207,12 +209,13 @@ async def carbon_make(self: Client, ctx: Message): "code": text, "backgroundColor": "#1F816D", } - try: - response = await fetch.post( - "https://carbon.yasirapi.eu.org/api/cook", json=json_data, timeout=20 - ) - except httpx.HTTPError as exc: - return await ctx.reply_msg(f"HTTP Exception for {exc.request.url} - {exc}") + with contextlib.redirect_stdout(sys.stderr): + try: + response = await fetch.post( + "https://carbon.yasirapi.eu.org/api/cook", json=json_data, timeout=20 + ) + except httpx.HTTPError as exc: + return await ctx.reply_msg(f"HTTP Exception for {exc.request.url} - {exc}") if response.status_code != 200: return await ctx.reply_photo( f"https://http.cat/{response.status_code}", @@ -561,17 +564,13 @@ async def who_is(client, message): async def close_callback(_, query: CallbackQuery): _, userid = query.data.split("#") if query.from_user.id != int(userid): - try: + with contextlib.suppress(QueryIdInvalid): return await query.answer("⚠️ Access Denied!", True) - except QueryIdInvalid: - return - try: + with contextlib.redirect_stdout(Exception): await query.answer("Deleting this message in 5 seconds.") await asyncio.sleep(5) await query.message.delete() await query.message.reply_to_message.delete() - except: - pass async def mdlapi(title): diff --git a/misskaty/plugins/web_scraper.py b/misskaty/plugins/web_scraper.py index d4a7ac2c..37f76a71 100644 --- a/misskaty/plugins/web_scraper.py +++ b/misskaty/plugins/web_scraper.py @@ -4,9 +4,11 @@ * @projectName MissKatyPyro * Copyright @YasirPedia All rights reserved """ +import contextlib import httpx import logging import re +import sys import traceback import cloudscraper @@ -71,15 +73,16 @@ def split_arr(arr, size: 5): # Terbit21 GetData async def getDataTerbit21(msg, kueri, CurrentPage, strings): if not SCRAP_DICT.get(msg.id): - try: - if kueri: - terbitjson = await fetch.get(f"{web['yasirapi']}/terbit21?q={kueri}") - else: - terbitjson = await fetch.get(f"{web['yasirapi']}/terbit21") - terbitjson.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") - return None, None + with contextlib.redirect_stdout(sys.stderr): + try: + if kueri: + terbitjson = await fetch.get(f"{web['yasirapi']}/terbit21?q={kueri}") + else: + terbitjson = await fetch.get(f"{web['yasirapi']}/terbit21") + terbitjson.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") + return None, None res = terbitjson.json() if not res.get("result"): await msg.edit_msg(strings("no_result"), del_in=5) @@ -108,15 +111,16 @@ async def getDataTerbit21(msg, kueri, CurrentPage, strings): # LK21 GetData async def getDatalk21(msg, kueri, CurrentPage, strings): if not SCRAP_DICT.get(msg.id): - try: - if kueri: - lk21json = await fetch.get(f"{web['yasirapi']}/lk21?q={kueri}") - else: - lk21json = await fetch.get(f"{web['yasirapi']}/lk21") - lk21json.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") - return None, None + with contextlib.redirect_stdout(sys.stderr): + try: + if kueri: + lk21json = await fetch.get(f"{web['yasirapi']}/lk21?q={kueri}") + else: + lk21json = await fetch.get(f"{web['yasirapi']}/lk21") + lk21json.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") + return None, None res = lk21json.json() if not res.get("result"): await msg.edit_msg(strings("no_result"), del_in=5) @@ -143,15 +147,16 @@ async def getDatalk21(msg, kueri, CurrentPage, strings): # Pahe GetData async def getDataPahe(msg, kueri, CurrentPage, strings): if not SCRAP_DICT.get(msg.id): - try: - if kueri: - pahejson = await fetch.get(f"{web['yasirapi']}/pahe?q={kueri}") - else: - pahejson = await fetch.get(f"{web['yasirapi']}/pahe") - pahejson.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") - return None, None + with contextlib.redirect_stdout(sys.stderr): + try: + if kueri: + pahejson = await fetch.get(f"{web['yasirapi']}/pahe?q={kueri}") + else: + pahejson = await fetch.get(f"{web['yasirapi']}/pahe") + pahejson.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") + return None, None res = pahejson.json() if not res.get("result"): await msg.edit_msg(strings("no_result"), del_in=5) @@ -175,14 +180,15 @@ async def getDataPahe(msg, kueri, CurrentPage, strings): async def getDataKuso(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): kusodata = [] - try: - data = await fetch.get( - f"{web['kusonime']}/?s={kueri}", follow_redirects=True - ) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None, None + with contextlib.redirect_stdout(sys.stderr): + try: + data = await fetch.get( + f"{web['kusonime']}/?s={kueri}", follow_redirects=True + ) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None, None res = BeautifulSoup(data, "lxml").find_all("h2", {"class": "episodeye"}) for i in res: ress = i.find_all("a")[0] @@ -224,14 +230,15 @@ async def getDataKuso(msg, kueri, CurrentPage, user, strings): async def getDataMovieku(msg, kueri, CurrentPage, strings): if not SCRAP_DICT.get(msg.id): moviekudata = [] - try: - data = await fetch.get( - f"{web['movieku']}/?s={kueri}", follow_redirects=True - ) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") - return None, None + with contextlib.redirect_stdout(sys.stderr): + try: + data = await fetch.get( + f"{web['movieku']}/?s={kueri}", follow_redirects=True + ) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}") + return None, None r = BeautifulSoup(data, "lxml") res = r.find_all(class_="bx") for i in res: @@ -261,14 +268,15 @@ async def getDataMovieku(msg, kueri, CurrentPage, strings): async def getDataNodrakor(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): nodrakordata = [] - try: - data = await fetch.get( - f"{web['nodrakor']}/?s={kueri}", follow_redirects=True, - ) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None + with contextlib.redirect_stdout(sys.stderr): + try: + data = await fetch.get( + f"{web['nodrakor']}/?s={kueri}", follow_redirects=True, + ) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None text = BeautifulSoup(data, "lxml") entry = text.find_all(class_="entry-header") if entry[0].text.strip() == "Nothing Found": @@ -308,14 +316,15 @@ async def getDataNodrakor(msg, kueri, CurrentPage, user, strings): async def getDataSavefilm21(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): sfdata = [] - try: - data = await fetch.get( - f"{web['savefilm21']}/?s={kueri}", follow_redirects=True, - ) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None + with contextlib.redirect_stdout(sys.stderr): + try: + data = await fetch.get( + f"{web['savefilm21']}/?s={kueri}", follow_redirects=True, + ) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None text = BeautifulSoup(data, "lxml") entry = text.find_all(class_="entry-header") if "Tidak Ditemukan" in entry[0].text: @@ -354,17 +363,18 @@ async def getDataSavefilm21(msg, kueri, CurrentPage, user, strings): # Lendrive GetData async def getDataLendrive(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): - try: - if kueri: - data = await fetch.get( - f"{web['lendrive']}/?s={kueri}", follow_redirects=True, - ) - else: - data = await fetch.get(web["lendrive"], follow_redirects=True) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None + with contextlib.redirect_stdout(sys.stderr): + try: + if kueri: + data = await fetch.get( + f"{web['lendrive']}/?s={kueri}", follow_redirects=True, + ) + else: + data = await fetch.get(web["lendrive"], follow_redirects=True) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None res = BeautifulSoup(data, "lxml") lenddata = [] for o in res.find_all(class_="bsx"): @@ -409,14 +419,15 @@ async def getDataLendrive(msg, kueri, CurrentPage, user, strings): # MelongMovie GetData async def getDataMelong(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): - try: - data = await fetch.get( - f"{web['melongmovie']}/?s={kueri}", follow_redirects=True, - ) - data.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None + with contextlib.redirect_stdout(sys.stderr): + try: + data = await fetch.get( + f"{web['melongmovie']}/?s={kueri}", follow_redirects=True, + ) + data.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None bs4 = BeautifulSoup(data, "lxml") melongdata = [] for res in bs4.select(".box"): @@ -454,14 +465,15 @@ async def getDataMelong(msg, kueri, CurrentPage, user, strings): # GoMov GetData async def getDataGomov(msg, kueri, CurrentPage, user, strings): if not SCRAP_DICT.get(msg.id): - try: - gomovv = await fetch.get( - f"{web['gomov']}/?s={kueri}", follow_redirects=True - ) - gomovv.raise_for_status() - except httpx.HTTPError as exc: - await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) - return None, 0, None + with contextlib.redirect_stdout(sys.stderr): + try: + gomovv = await fetch.get( + f"{web['gomov']}/?s={kueri}", follow_redirects=True + ) + gomovv.raise_for_status() + except httpx.HTTPError as exc: + await msg.edit_msg(f"ERROR: Failed to fetch data from {exc.request.url} - {exc}", disable_web_page_preview=True) + return None, 0, None text = BeautifulSoup(gomovv, "lxml") entry = text.find_all(class_="entry-header") if entry[0].text.strip() == "Tidak Ditemukan": @@ -1310,19 +1322,20 @@ async def savefilm21_scrap(_, callback_query, strings): ), InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) - try: - html = await fetch.get(link) - html.raise_for_status() - soup = BeautifulSoup(html.text, "lxml") - res = soup.find_all(class_="button button-shadow") - res = "".join(f"{i.text}\n{i['href']}\n\n" for i in res) - await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=res), reply_markup=keyboard - ) - except httpx.HTTPError as exc: - await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) - except Exception as err: - await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) + with contextlib.redirect_stdout(sys.stderr): + try: + html = await fetch.get(link) + html.raise_for_status() + soup = BeautifulSoup(html.text, "lxml") + res = soup.find_all(class_="button button-shadow") + res = "".join(f"{i.text}\n{i['href']}\n\n" for i in res) + await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=res), reply_markup=keyboard + ) + except httpx.HTTPError as exc: + await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) + except Exception as err: + await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) # NoDrakor DDL @@ -1349,64 +1362,66 @@ async def nodrakorddl_scrap(_, callback_query, strings): ), InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) - try: - html = await fetch.get(link) - html.raise_for_status() - soup = BeautifulSoup(html.text, "lxml") - if "/tv/" in link: - result = soup.find("div", {"entry-content entry-content-single"}).find_all("p") - msg = "" - for i in result: - msg += str(f"{i}\n") - link = await post_to_telegraph(False, "MissKaty NoDrakor", msg) - return await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard + with contextlib.redirect_stdout(sys.stderr): + try: + html = await fetch.get(link) + html.raise_for_status() + soup = BeautifulSoup(html.text, "lxml") + if "/tv/" in link: + result = soup.find("div", {"entry-content entry-content-single"}).find_all("p") + msg = "" + for i in result: + msg += str(f"{i}\n") + link = await post_to_telegraph(False, "MissKaty NoDrakor", msg) + return await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard + ) + res = soup.find_all(class_="button button-shadow") + res = "".join(f"{i.text}\n{i['href']}\n\n" for i in res) + if len(res) > 3500: + link = await post_to_telegraph(False, "MissKaty NoDrakor", res) + return await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard + ) + await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=res), reply_markup=keyboard ) - res = soup.find_all(class_="button button-shadow") - res = "".join(f"{i.text}\n{i['href']}\n\n" for i in res) - if len(res) > 3500: - link = await post_to_telegraph(False, "MissKaty NoDrakor", res) - return await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=link), reply_markup=keyboard - ) - await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=res), reply_markup=keyboard - ) - except httpx.HTTPError as exc: - await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) - except Exception as err: - await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) + except httpx.HTTPError as exc: + await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) + except Exception as err: + await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) # Scrape Link Download Movieku.CC @app.on_cmd("movieku_scrap") @use_chat_lang() async def muviku_scrap(_, message, strings): - try: - link = message.text.split(maxsplit=1)[1] - html = await fetch.get(link) - html.raise_for_status() - soup = BeautifulSoup(html.text, "lxml") - res = soup.find_all(class_="smokeurl") - data = [] - for i in res: - for b in range(len(i.find_all("a"))): - link = i.find_all("a")[b]["href"] - kualitas = i.find_all("a")[b].text - # print(f"{kualitas}\n{link - data.append({"link": link, "kualitas": kualitas}) - if not data: - return await message.reply(strings("no_result")) - res = "".join(f"Host: {i['kualitas']}\n{i['link']}\n\n" for i in data) - await message.reply(res) - except IndexError: - return await message.reply( - strings("invalid_cmd_scrape").format(cmd=message.command[0]) - ) - except httpx.HTTPError as exc: - await message.reply(f"HTTP Exception for {exc.request.url} - {exc}") - except Exception as e: - await message.reply(f"ERROR: {str(e)}") + with contextlib.redirect_stdout(sys.stderr): + try: + link = message.text.split(maxsplit=1)[1] + html = await fetch.get(link) + html.raise_for_status() + soup = BeautifulSoup(html.text, "lxml") + res = soup.find_all(class_="smokeurl") + data = [] + for i in res: + for b in range(len(i.find_all("a"))): + link = i.find_all("a")[b]["href"] + kualitas = i.find_all("a")[b].text + # print(f"{kualitas}\n{link + data.append({"link": link, "kualitas": kualitas}) + if not data: + return await message.reply(strings("no_result")) + res = "".join(f"Host: {i['kualitas']}\n{i['link']}\n\n" for i in data) + await message.reply(res) + except IndexError: + return await message.reply( + strings("invalid_cmd_scrape").format(cmd=message.command[0]) + ) + except httpx.HTTPError as exc: + await message.reply(f"HTTP Exception for {exc.request.url} - {exc}") + except Exception as e: + await message.reply(f"ERROR: {str(e)}") # Scrape DDL Link Melongmovie @@ -1433,22 +1448,23 @@ async def melong_scrap(_, callback_query, strings): ), InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) - try: - html = await fetch.get(link) - html.raise_for_status() - soup = BeautifulSoup(html.text, "lxml") - rep = "" - for ep in soup.findAll(text=re.compile(r"(?i)episode\s+\d+|LINK DOWNLOAD")): - hardsub = ep.findPrevious("div") - softsub = ep.findNext("div") - rep += f"{hardsub}\n{softsub}" - await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=rep), reply_markup=keyboard - ) - except httpx.HTTPError as exc: - await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) - except Exception as err: - await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) + with contextlib.redirect_stdout(sys.stderr): + try: + html = await fetch.get(link) + html.raise_for_status() + soup = BeautifulSoup(html.text, "lxml") + rep = "" + for ep in soup.findAll(text=re.compile(r"(?i)episode\s+\d+|LINK DOWNLOAD")): + hardsub = ep.findPrevious("div") + softsub = ep.findNext("div") + rep += f"{hardsub}\n{softsub}" + await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=rep), reply_markup=keyboard + ) + except httpx.HTTPError as exc: + await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) + except Exception as err: + await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) # Scrape DDL Link Gomov @@ -1475,23 +1491,24 @@ async def gomov_dl(_, callback_query, strings): ), InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) - try: - html = await fetch.get(link) - html.raise_for_status() - soup = BeautifulSoup(html.text, "lxml") - entry = soup.find(class_="gmr-download-wrap clearfix") - hasil = soup.find(class_="title-download").text - for i in entry.find(class_="list-inline gmr-download-list clearfix"): - title = i.find("a").text - ddl = i.find("a")["href"] - hasil += f"\n{title}\n{ddl}\n" - await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=hasil), reply_markup=keyboard - ) - except httpx.HTTPError as exc: - await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) - except Exception as err: - await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) + with contextlib.redirect_stdout(sys.stderr): + try: + html = await fetch.get(link) + html.raise_for_status() + soup = BeautifulSoup(html.text, "lxml") + entry = soup.find(class_="gmr-download-wrap clearfix") + hasil = soup.find(class_="title-download").text + for i in entry.find(class_="list-inline gmr-download-list clearfix"): + title = i.find("a").text + ddl = i.find("a")["href"] + hasil += f"\n{title}\n{ddl}\n" + await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=hasil), reply_markup=keyboard + ) + except httpx.HTTPError as exc: + await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) + except Exception as err: + await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) @app.on_cb("lendriveextract#") @@ -1515,23 +1532,24 @@ async def lendrive_dl(_, callback_query, strings): ), InlineButton(strings("cl_btn"), f"close#{callback_query.from_user.id}"), ) - try: - hmm = await fetch.get(link) - hmm.raise_for_status() - q = BeautifulSoup(hmm.text, "lxml") - j = q.findAll("div", class_="soraurlx") - kl = "" - for i in j: - if not i.find("a"): - continue - kl += f"{i.find('strong')}:\n" - kl += "".join( - f"[ {a.text} ]\n" for a in i.findAll("a") + with contextlib.redirect_stdout(sys.stderr): + try: + hmm = await fetch.get(link) + hmm.raise_for_status() + q = BeautifulSoup(hmm.text, "lxml") + j = q.findAll("div", class_="soraurlx") + kl = "" + for i in j: + if not i.find("a"): + continue + kl += f"{i.find('strong')}:\n" + kl += "".join( + f"[ {a.text} ]\n" for a in i.findAll("a") + ) + await callback_query.message.edit_msg( + strings("res_scrape").format(link=link, kl=kl), reply_markup=keyboard ) - await callback_query.message.edit_msg( - strings("res_scrape").format(link=link, kl=kl), reply_markup=keyboard - ) - except httpx.HTTPError as exc: - await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) - except Exception as err: - await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard) + except httpx.HTTPError as exc: + await callback_query.message.edit_msg(f"HTTP Exception for {exc.request.url} - {exc}", reply_markup=keyboard) + except Exception as err: + await callback_query.message.edit_msg(f"ERROR: {err}", reply_markup=keyboard)