pyrofork/pyrogram/methods/auth/sign_in_bot.py
wulan17 5aae488747
pyrofork: Initial Dynamic datacenters ip address support
Signed-off-by: wulan17 <wulan17@nusantararom.org>
2024-07-13 19:26:19 +07:00

105 lines
4.4 KiB
Python

# Pyrofork - Telegram MTProto API Client Library for Python
# Copyright (C) 2017-present Dan <https://github.com/delivrance>
# Copyright (C) 2022-present Mayuri-Chan <https://github.com/Mayuri-Chan>
#
# This file is part of Pyrofork.
#
# Pyrofork is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrofork is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
import logging
import pyrogram
from pyrogram import raw
from pyrogram import types
from pyrogram.errors import UserMigrate
from pyrogram.session import Session, Auth
log = logging.getLogger(__name__)
class SignInBot:
async def sign_in_bot(
self: "pyrogram.Client",
bot_token: str
) -> "types.User":
"""Authorize a bot using its bot token generated by BotFather.
.. include:: /_includes/usable-by/bots.rst
Parameters:
bot_token (``str``):
The bot token generated by BotFather
Returns:
:obj:`~pyrogram.types.User`: On success, the bot identity is return in form of a user object.
Raises:
BadRequest: In case the bot token is invalid.
"""
while True:
try:
r = await self.invoke(
raw.functions.auth.ImportBotAuthorization(
flags=0,
api_id=self.api_id,
api_hash=self.api_hash,
bot_auth_token=bot_token
)
)
except UserMigrate as e:
config = await self.invoke(raw.functions.help.GetConfig())
for option in config.dc_options:
if (option.id == e.value):
if option.media_only:
if option.ipv6:
await self.storage.media_address_v6(option.ip_address)
else:
await self.storage.media_address(option.ip_address)
if option.this_port_only:
await self.storage.media_port(option.port)
else:
if option.ipv6:
await self.storage.server_address_v6(option.ip_address)
else:
await self.storage.server_address(option.ip_address)
if option.this_port_only:
await self.storage.port(option.port)
if e not in [2,4] or self.storage.test_mode():
await self.storage.media_address(await self.storage.server_address())
await self.storage.media_address_v6(await self.storage.server_address_v6())
await self.storage.media_port(await self.storage.server_port())
await self.session.stop()
await self.storage.dc_id(e.value)
await self.storage.auth_key(
await Auth(
self, await self.storage.dc_id(),
await self.storage.test_mode(),
await self.storage.server_address_v6() if self.ipv6 else await self.storage.server_address(),
await self.storage.server_port()
).create()
)
self.session = Session(
self, await self.storage.dc_id(),
await self.storage.auth_key(), await self.storage.test_mode(),
await self.storage.server_address_v6() if self.ipv6 else await self.storage.server_address(),
await self.storage.server_port()
)
await self.session.start()
else:
await self.storage.user_id(r.user.id)
await self.storage.is_bot(True)
return types.User._parse(self, r.user)