pyrofork: Refactor Qr Code Signin

Signed-off-by: wulan17 <wulan17@komodos.id>
This commit is contained in:
wulan17 2025-05-15 23:16:52 +07:00
parent 99e6005cf1
commit 5e35c47b61
No known key found for this signature in database
GPG key ID: 737814D4B5FF0420
3 changed files with 22 additions and 41 deletions

View file

@ -39,6 +39,7 @@ import pyrogram
from pyrogram import __version__, __license__ from pyrogram import __version__, __license__
from pyrogram import enums from pyrogram import enums
from pyrogram import raw from pyrogram import raw
from pyrogram import types
from pyrogram import utils from pyrogram import utils
from pyrogram.crypto import aes from pyrogram.crypto import aes
from pyrogram.errors import CDNFileHashMismatch from pyrogram.errors import CDNFileHashMismatch
@ -510,15 +511,14 @@ class Client(Methods):
print(e.MESSAGE) print(e.MESSAGE)
self.password = None self.password = None
else: else:
if self.use_qrcode and isinstance(signed_in, raw.types.auth.LoginToken): if self.use_qrcode and isinstance(signed_in, types.LoginToken):
time_out = signed_in.expires - datetime.timestamp(datetime.now()) time_out = signed_in.expires - datetime.timestamp(datetime.now())
try: try:
await asyncio.wait_for(self._wait_for_update_login_token(), timeout=time_out) await asyncio.wait_for(self._wait_for_update_login_token(), timeout=time_out)
except asyncio.TimeoutError: except asyncio.TimeoutError:
print("QR code expired, Requesting new Qr code...") print("QR code expired, Requesting new Qr code...")
continue continue
else: break
break
if isinstance(signed_in, User): if isinstance(signed_in, User):
return signed_in return signed_in

View file

@ -28,14 +28,6 @@ from pyrogram.session import Session, Auth
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
QRCODE_AVAIL = False
try:
import qrcode
QRCODE_AVAIL = True
except ImportError:
QRCODE_AVAIL = False
class SignInQrcode: class SignInQrcode:
async def sign_in_qrcode( async def sign_in_qrcode(
self: "pyrogram.Client" self: "pyrogram.Client"
@ -54,7 +46,9 @@ class SignInQrcode:
SessionPasswordNeeded: In case a password is needed to sign in. SessionPasswordNeeded: In case a password is needed to sign in.
""" """
if not QRCODE_AVAIL: try:
import qrcode
except ImportError:
raise ImportError("qrcode is missing! " raise ImportError("qrcode is missing! "
"Please install it with `pip install qrcode`") "Please install it with `pip install qrcode`")
r = await self.session.invoke( r = await self.session.invoke(
@ -79,13 +73,13 @@ class SignInQrcode:
print("Scan the QR code with your Telegram app.") print("Scan the QR code with your Telegram app.")
qr.print_ascii() qr.print_ascii()
return r return types.LoginToken._parse(r)
elif isinstance(r, raw.types.auth.LoginTokenSuccess): if isinstance(r, raw.types.auth.LoginTokenSuccess):
await self.storage.user_id(r.authorization.user.id) await self.storage.user_id(r.authorization.user.id)
await self.storage.is_bot(False) await self.storage.is_bot(False)
return types.User._parse(self, r.authorization.user) return types.User._parse(self, r.authorization.user)
elif isinstance(r, raw.types.auth.LoginTokenMigrateTo): if isinstance(r, raw.types.auth.LoginTokenMigrateTo):
# pylint: disable=access-member-before-definition # pylint: disable=access-member-before-definition
await self.session.stop() await self.session.stop()
@ -107,33 +101,11 @@ class SignInQrcode:
token=r.token token=r.token
) )
) )
if isinstance(r, raw.types.auth.LoginToken): if isinstance(r, raw.types.auth.LoginTokenSuccess):
base64_token = b64encode(r.token).decode("utf-8")
login_url = f"tg://login?token={base64_token}"
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(login_url)
qr.make(fit=True)
print("Scan the QR code below with your Telegram app.")
qr.print_ascii()
return types.LoginToken(
self,
r.token,
r.expires
)
elif isinstance(r, raw.types.auth.LoginTokenSuccess):
await self.storage.user_id(r.authorization.user.id) await self.storage.user_id(r.authorization.user.id)
await self.storage.is_bot(False) await self.storage.is_bot(False)
return types.User._parse(self, r.authorization.user) return types.User._parse(self, r.authorization.user)
else: raise pyrogram.exceptions.RPCError(
raise pyrogram.exceptions.RPCError( "Unknown response type from Telegram API"
"Unknown response type from Telegram API" )
)
return r

View file

@ -18,6 +18,8 @@
from ..object import Object from ..object import Object
from pyrogram import raw
class LoginToken(Object): class LoginToken(Object):
"""Contains info on a login token. """Contains info on a login token.
@ -35,3 +37,10 @@ class LoginToken(Object):
self.token = token self.token = token
self.expires = expires self.expires = expires
@staticmethod
def _parse(login_token: "raw.base.LoginToken") -> "LoginToken":
return LoginToken(
token=login_token.token,
expires=login_token.expires,
)