coab dulu

This commit is contained in:
yasirarism 2023-07-04 10:32:16 +07:00 committed by GitHub
parent 647ba465b4
commit 96caa41d6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 397 additions and 87 deletions

View file

@ -1,8 +0,0 @@
from pyrogram import filters
def pesanedit(_, __, m):
return bool(m.edit_date)
edited = filters.create(pesanedit)

View file

@ -1,67 +0,0 @@
import asyncio
from logging import getLogger
from pyrogram.errors import (
ChatAdminRequired,
ChatWriteForbidden,
FloodWait,
MessageDeleteForbidden,
MessageEmpty,
MessageIdInvalid,
MessageNotModified,
)
LOGGER = getLogger(__name__)
# handler for TG function, so need write exception in every code
# Send MSG Pyro
async def kirimPesan(msg, text, **kwargs):
try:
return await msg.reply(text, **kwargs)
except FloodWait as e:
LOGGER.warning(str(e))
await asyncio.sleep(e.value)
return await kirimPesan(msg, text, **kwargs)
except (ChatWriteForbidden, ChatAdminRequired):
LOGGER.info(
f"Leaving from {msg.chat.title} [{msg.chat.id}] because doesn't have admin permission."
)
return await msg.chat.leave()
except Exception as e:
LOGGER.error(str(e))
return
# Edit MSG Pyro
async def editPesan(msg, text, **kwargs):
try:
return await msg.edit(text, **kwargs)
except FloodWait as e:
LOGGER.warning(str(e))
await asyncio.sleep(e.value)
return await editPesan(msg, text, **kwargs)
except (MessageNotModified, MessageIdInvalid, MessageEmpty):
return
except (ChatWriteForbidden, ChatAdminRequired):
LOGGER.info(
f"Leaving from {msg.chat.title} [{msg.chat.id}] because doesn't have admin permission."
)
return await msg.chat.leave()
except Exception as e:
LOGGER.error(str(e))
return
async def hapusPesan(msg):
try:
return await msg.delete()
except (MessageDeleteForbidden, ChatAdminRequired):
return
except FloodWait as e:
LOGGER.warning(str(e))
await asyncio.sleep(e.value)
return await hapusPesan(msg)
except Exception as e:
LOGGER.error(str(e))

View file

@ -1 +1,2 @@
from .command import command
from .command import command
from .callback import callback

View file

@ -0,0 +1,100 @@
# tgEasy - Easy for a brighter Shine. A monkey pather add-on for Pyrogram
# Copyright (C) 2021 - 2022 Jayant Hegde Kageri <https://github.com/jayantkageri>
# This file is part of tgEasy.
# tgEasy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# tgEasy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with tgEasy. If not, see <http://www.gnu.org/licenses/>.
import typing
import pyrogram
from pyrogram.methods import Decorators
from .utils import handle_error
def callback(
self,
data: typing.Union[str, list],
self_admin: typing.Union[bool, bool] = False,
filter: typing.Union[pyrogram.filters.Filter, pyrogram.filters.Filter] = None,
*args,
**kwargs,
):
"""
### `Client.callback`
- A decorater to Register Callback Quiries in simple way and manage errors in that Function itself, alternative for `@pyrogram.Client.on_callback_query(pyrogram.filters.regex('^data.*'))`
- Parameters:
- data (str || list):
- The callback query to be handled for a function
- self_admin (bool) **optional**:
- If True, the command will only executeed if the Bot is Admin in the Chat, By Default False
- filter (`~pyrogram.filters`) **optional**:
- Pyrogram Filters, hope you know about this, for Advaced usage. Use `and` for seaperating filters.
#### Example
.. code-block:: python
import pyrogram
app = pyrogram.Client()
@app.command("start")
async def start(client, message):
await message.reply_text(
f"Hello {message.from_user.mention}",
reply_markup=pyrogram.types.InlineKeyboardMarkup([[
pyrogram.types.InlineKeyboardButton(
"Click Here",
"data"
)
]])
)
@app.callback("data")
async def data(client, CallbackQuery):
await CallbackQuery.answer("Hello :)", show_alert=True)
"""
if filter:
filter = pyrogram.filters.regex(f"^{data}.*") & args["filter"]
else:
filter = pyrogram.filters.regex(f"^{data}.*")
def wrapper(func):
async def decorator(client, CallbackQuery: pyrogram.types.CallbackQuery):
if self_admin:
me = await client.get_chat_member(
CallbackQuery.message.chat.id, (await client.get_me()).id
)
if me.status not in (
pyrogram.enums.ChatMemberStatus.OWNER,
pyrogram.enums.ChatMemberStatus.ADMINISTRATOR,
):
return await CallbackQuery.message.edit_text(
"I must be admin to execute this Command"
)
try:
await func(client, CallbackQuery)
except pyrogram.errors.exceptions.forbidden_403.ChatAdminRequired:
pass
except BaseException as e:
return await handle_error(e, CallbackQuery)
self.add_handler(
pyrogram.handlers.CallbackQueryHandler(decorator, filter)
)
return decorator
return wrapper
Decorators.on_cb = callback

View file

@ -7,6 +7,7 @@ from misskaty.vars import COMMAND_HANDLER
def command(
self,
command: typing.Union[str, list],
is_disabled: typing.Union[bool, bool] = False
pm_only: typing.Union[bool, bool] = False,
group_only: typing.Union[bool, bool] = False,
self_admin: typing.Union[bool, bool] = False,
@ -17,7 +18,7 @@ def command(
**kwargs
):
"""
### `tgEasy.tgClient.command`
### `tgClient.command`
- A decorater to Register Commands in simple way and manage errors in that Function itself, alternative for `@pyrogram.Client.on_message(pyrogram.filters.command('command'))`
- Parameters:
- command (str || list):
@ -44,11 +45,10 @@ def command(
#### Example
.. code-block:: python
import pyrogram
from tgEasy import tgClient
app = tgClient(pyrogram.Client())
app = pyrogram.Client()
@app.command("start", group_only=False, pm_only=False, self_admin=False, self_only=False, pyrogram.filters.chat("777000") and pyrogram.filters.text)
@app.command("start", is_disabled=False, group_only=False, pm_only=False, self_admin=False, self_only=False, pyrogram.filters.chat("777000") and pyrogram.filters.text)
async def start(client, message):
await message.reply_text(f"Hello {message.from_user.mention}")
"""
@ -78,6 +78,8 @@ def command(
def wrapper(func):
async def decorator(client, message: pyrogram.types.Message):
if is_disabled:
return await message.reply_text("Sorry, this command has been disabled by owner.")
if (
self_admin
and message.chat.type != pyrogram.enums.ChatType.SUPERGROUP

View file

@ -1,2 +1,4 @@
from .utils import PyromodConfig, patch, patchable
from .handle_error import handle_error
from .handle_error import handle_error
from .admin_utils import check_rights, is_admin
from .get_user import get_user

View file

@ -0,0 +1,154 @@
# tgEasy - Easy for a brighter Shine. A monkey pather add-on for Pyrogram
# Copyright (C) 2021 - 2022 Jayant Hegde Kageri <https://github.com/jayantkageri>
# This file is part of tgEasy.
# tgEasy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# tgEasy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with tgEasy. If not, see <http://www.gnu.org/licenses/>.
import typing
import pyrogram
async def check_rights(
chat_id: typing.Union[int, int],
user_id: typing.Union[int, int],
rights: typing.Union[str, list],
client,
) -> bool:
"""
### `check_rights`
- Checks the Rights of an User
- This is an Helper Function for `adminsOnly`
- Parameters:
- chat_id (int):
- The Chat ID of Which Chat have to check the Rights.
- user_id (int):
- The User ID of Whose Rights have to Check.
- rights (str):
- The Rights have to Check.
- client (`pyrogram.Client`):
- From which Client to Check the Rights.
- Returns:
- `True` if the User have the Right.
- `False` if the User don't have the Right.
#### Example
.. code-block:: python
import pyrogram
app = pyrogram.Client()
@app.command("ban", group_only=True, self_admin=True)
async def ban(client, message):
if not await check_rights(message.chat.id, message.from_user.id, "can_restrict_members"):
return await message.reply_text("You don't have necessary rights to use this Command.")
user = await get_user(message)
await message.chat.kick_member(user.id)
"""
try:
user = await client.get_chat_member(chat_id, user_id)
except Exception:
return False
if user.status == "user":
return False
if user.status in (
pyrogram.enums.ChatMemberStatus.OWNER,
pyrogram.enums.ChatMemberStatus.ADMINISTRATOR,
):
permission = []
if user.privileges.can_manage_chat:
permission.append("can_manage_chat")
if user.privileges.can_delete_messages:
permission.append("can_delete_messages")
if user.privileges.can_manage_video_chats:
permission.append("can_manage_video_chats")
if user.privileges.can_restrict_members:
permission.append("can_restrict_members")
if user.privileges.can_promote_members:
permission.append("can_promote_members")
if user.privileges.can_change_info:
permission.append("can_change_info")
if user.privileges.can_post_messages:
permission.append("can_post_messages")
if user.privileges.can_edit_messages:
permission.append("can_edit_messages")
if user.privileges.can_invite_users:
permission.append("can_invite_users")
if user.privileges.can_pin_messages:
permission.append("can_pin_messages")
if user.privileges.is_anonymous:
permission.append("is_anonymous")
if isinstance(rights, str):
return rights in permission
if isinstance(rights, list):
for right in rights:
return right in permission
return False
async def is_admin(
chat_id: typing.Union[int, str], user_id: typing.Union[int, str], client
) -> bool:
"""
### `is_admin`
- A Functions to Check if the User is Admin or not
- Parameters:
- chat_id (int):
- The Chat ID of Which Chat have to check the Admin Status.
- user_id (int):
- The User ID of Whose Admin Status have to Check.
- client (`pyrogram.Client`):
- From which Client to Check the Admin Status.
- Returns:
- `True` if the User is Admin.
- `False` if the User is't Admin.
#### Example
.. code-block:: python
import pyrogram
app = pyrogram.Client()
@app.command("ban", group_only=True, self_admin=True)
@app.adminsOnly("can_restrict_members")
async def ban(client, message):
if await is_admin(message.chat.id, (await get_user(mesasge)).id):
return await message.reply_text("You can't Ban Admins.")
await message.chat.kick_member((await get_user(message)).id)
await message.reply_text("User has been Banned.")
"""
try:
user = await client.get_chat_member(chat_id, user_id)
except Exception:
return False
return user.status in (pyrogram.enums.ChatMemberStatus.OWNER, pyrogram.enums.ChatMemberStatus.ADMINISTRATOR,)

View file

@ -0,0 +1,122 @@
# tgEasy - Easy for a brighter Shine. A monkey pather add-on for Pyrogram
# Copyright (C) 2021 - 2022 Jayant Hegde Kageri <https://github.com/jayantkageri>
# This file is part of tgEasy.
# tgEasy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# tgEasy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with tgEasy. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import typing
import pyrogram
async def get_user(
m: typing.Union[pyrogram.types.Message, pyrogram.types.CallbackQuery]
) -> pyrogram.types.User or bool:
"""
### `tgEasy.get_user`
- Gets a User from Message/RepliedMessage/CallbackQuery
- Parameters:
- m (`~pyrogram.types.Message` || `~pyrogram.types.CallbackQuery`)
- Returns:
- `pyrogram.types.User` on Success
- `False` on Error
#### Example
.. code-block:: python
from tgEasy import get_user, command, adminsOnly
@command("ban", group_only=True, self_admin=True)
@adminsOnly("can_restrict_members")
async def ban(client, message):
user = await get_user(message)
await message.chat.kick_member(user.id)
"""
if isinstance(m, pyrogram.types.Message):
message = m
client = m._client
if isinstance(m, pyrogram.types.CallbackQuery):
message = m.message
client = message._client
if message.reply_to_message:
if message.reply_to_message.sender_chat:
return False
return await client.get_users(message.reply_to_message.from_user.id)
command = message.command[1] if len(message.command) > 1 else None
if command and (command.startswith("@") or command.isdigit()):
with contextlib.suppress(
pyrogram.errors.exceptions.bad_request_400.UsernameNotOccupied,
pyrogram.errors.exceptions.bad_request_400.UsernameInvalid,
pyrogram.errors.exceptions.bad_request_400.PeerIdInvalid,
IndexError,
):
return await client.get_users(message.command[1])
if message.entities:
for mention in message.entities:
if mention.type == "text_mention":
user = mention.user.id
break
with contextlib.suppress(Exception):
return await client.get_users(user)
return False
async def get_user_adv(
m: typing.Union[pyrogram.types.Message, pyrogram.types.CallbackQuery]
) -> pyrogram.types.User or bool:
"""
### `tgEasy.get_user_adv`
- A Function to Get the User from the Message/CallbackQuery, If there is None arguments, returns the From User.
- Parameters:
- m (`pyrogram.types.Message` || `pyrogram.types.CallbackQuery`):
- Message or Callbackquery.
- Returns:
- `pyrogram.types.User` on Success
- `False` on Error
#### Example
.. code-block:: python
from tgEasy import command, get_user_adv
@command("id")
async def id(client, message):
user = await get_user_adv(message)
await message.reply_text(f"Your ID is `{user.id}`")
"""
if isinstance(m, pyrogram.types.Message):
message = m
if isinstance(m, pyrogram.types.CallbackQuery):
message = m.message
if message.sender_chat:
return False
with contextlib.suppress(IndexError, AttributeError):
if len(message.command) > 1:
if message.command[1].startswith("@"):
return await get_user(message)
if message.command[1].isdigit():
return await get_user(message)
if "text_mention" in message.entities:
return await get_user(message)
if "from_user" in str(message.reply_to_message):
return await get_user(message)
with contextlib.suppress(Exception):
if "sender_chat" in str(message.reply_to_message):
return False
if "from_user" in str(message.reply_to_message):
return await message._client.get_users(
message.reply_to_message.from_user.id
)
return await message._client.get_users(message.from_user.id)

View file

@ -19,6 +19,7 @@ import contextlib
import os
import typing
import pyrogram
from datetime import datetime
from misskaty.vars import LOG_CHANNEL
@ -40,7 +41,6 @@ async def handle_error(
#### Exapmle
.. code-block:: python
from tgEasy import tgClient, handle_error
import pyrogram
app = tgClient(pyrogram.Client())
@ -58,6 +58,10 @@ async def handle_error(
logging = logger.getLogger(__name__)
logging.exception(traceback.format_exc())
day = datetime.now()
tgl_now = datetime.now()
cap_day = f"{day.strftime('%A')}, {tgl_now.strftime('%d %B %Y %H:%M:%S')}"
with open("crash.txt", "w+", encoding="utf-8") as log:
log.write(traceback.format_exc())
log.close()
@ -67,7 +71,7 @@ async def handle_error(
"An Internal Error Occurred while Processing your Command, the Logs have been sent to the Owners of this Bot. Sorry for Inconvenience"
)
await m._client.send_document(
LOG_CHANNEL, "crash.txt", caption="Crash Report of this Bot"
LOG_CHANNEL, crash_{tgl_now.strftime('%d %B %Y')}.txt, caption="Crash Report of this Bot\n{cap_day}"
)
if isinstance(m, pyrogram.types.CallbackQuery):
with contextlib.suppress(Exception):
@ -76,7 +80,7 @@ async def handle_error(
"An Internal Error Occurred while Processing your Command, the Logs have been sent to the Owners of this Bot. Sorry for Inconvenience"
)
await m.message._client.send_document(
LOG_CHANNEL, "crash.txt", caption="Crash Report of this Bot"
LOG_CHANNEL, crash_{tgl_now.strftime('%d %B %Y')}.txt, caption="Crash Report of this Bot\n{cap_day}"
)
os.remove("crash.txt")
os.remove(crash_{tgl_now.strftime('%d %B %Y')}.txt)
return True

View file

@ -22,7 +22,7 @@ openai.api_key = OPENAI_API
# This only for testing things, since maybe in future it will got blocked
@app.on_message(filters.command("bard", COMMAND_HANDLER))
@app.on_cmd("bard", is_disabled=True)
@use_chat_lang()
async def bard_chatbot(self: Client, ctx: Message, strings):
if len(ctx.command) == 1:

View file

@ -37,7 +37,7 @@ async def getTitleSub(msg, kueri, CurrentPage, user):
lists = soup.find("div", {"class": "search-result"})
entry = lists.find_all("div", {"class": "title"})
# if "Tidak Ditemukan" in entry[0].text:
# await editPesan(msg, f"Sorry, could not find any result for: {kueri}")
# await msg.edit_msg(f"Sorry, could not find any result for: {kueri}")
# return None, 0, None
for sub in entry:
title = sub.find("a").text