Pyrofork: Add Mongodb Session Storage

Signed-off-by: wulan17 <wulan17@nusantararom.org>
Co-authored-by: wulan17 <wulan17@nusantararom.org>

Pyrofork: Use session name as database name, add some parameters informations and some cleanup (#3)

Changes to be committed:
	modified:   pyrogram/client.py
	modified:   pyrogram/storage/mongo_storage.py

Signed-off-by: wulan17 <wulan17@nusantararom.org>

PyroFork: storage: mongo: Use existing database connection

support both async_pymongo and motor

Signed-off-by: wulan17 <wulan17@nusantararom.org>

PyroFork: Use Dummy client object to check wether connection object is valid or not

Signed-off-by: wulan17 <wulan17@nusantararom.org>
This commit is contained in:
Animesh Murmu 2023-05-10 18:50:44 +07:00 committed by wulan17
parent d91cf7f120
commit b43b20351f
No known key found for this signature in database
GPG key ID: 318CD6CD3A6AC0A5
6 changed files with 271 additions and 1 deletions

View file

@ -61,6 +61,34 @@ In case you don't want to have any session file saved to disk, you can use an in
This storage engine is still backed by SQLite, but the database exists purely in memory. This means that, once you stop This storage engine is still backed by SQLite, but the database exists purely in memory. This means that, once you stop
a client, the entire database is discarded and the session details used for logging in again will be lost forever. a client, the entire database is discarded and the session details used for logging in again will be lost forever.
Mongodb Storage
^^^^^^^^^^^^^^^
In case you want to have persistent session but you don't have persistent storage you can use mongodb storage by passing
mongodb config as ``dict`` to the ``mongodb`` parameter of the :obj:`~pyrogram.Client` constructor:
Using async_pymongo (Recommended for python3.9+):
.. code-block:: python
from async_pymongo import AsyncClient
from pyrogram import Client
conn = AsyncClient("mongodb://...")
async with Client("my_account", mongodb=dict(connection=conn, remove_peers=False)) as app:
print(await app.get_me())
Using motor:
.. code-block:: python
from motor.motor_asyncio import AsyncIOMotorClient
from pyrogram import Client
conn = AsyncIOMotorClient("mongodb://...")
async with Client("my_account", mongodb=dict(connection=conn, remove_peers=False)) as app:
print(await app.get_me())
This storage engine is backed by MongoDB, a session will be created and saved to mongodb database. Any subsequent client
restart will make PyroFork search for a database named that way and the session database will be automatically loaded.
Session Strings Session Strings
--------------- ---------------

View file

@ -50,7 +50,7 @@ from pyrogram.errors import (
from pyrogram.handlers.handler import Handler from pyrogram.handlers.handler import Handler
from pyrogram.methods import Methods from pyrogram.methods import Methods
from pyrogram.session import Auth, Session from pyrogram.session import Auth, Session
from pyrogram.storage import FileStorage, MemoryStorage from pyrogram.storage import FileStorage, MemoryStorage, MongoStorage
from pyrogram.types import User, TermsOfService from pyrogram.types import User, TermsOfService
from pyrogram.utils import ainput from pyrogram.utils import ainput
from .dispatcher import Dispatcher from .dispatcher import Dispatcher
@ -126,6 +126,10 @@ class Client(Methods):
pass to the ``session_string`` parameter. pass to the ``session_string`` parameter.
Defaults to False. Defaults to False.
mongodb (``dict``, *optional*):
Mongodb config as dict, e.g.: *dict(connection=async_pymongo.AsyncClient("mongodb://..."), remove_peers=False)*.
Only applicable for new sessions.
phone_number (``str``, *optional*): phone_number (``str``, *optional*):
Pass the phone number as string (with the Country Code prefix included) to avoid entering it manually. Pass the phone number as string (with the Country Code prefix included) to avoid entering it manually.
Only applicable for new sessions. Only applicable for new sessions.
@ -220,6 +224,7 @@ class Client(Methods):
bot_token: str = None, bot_token: str = None,
session_string: str = None, session_string: str = None,
in_memory: bool = None, in_memory: bool = None,
mongodb: dict = None,
phone_number: str = None, phone_number: str = None,
phone_code: str = None, phone_code: str = None,
password: str = None, password: str = None,
@ -249,6 +254,7 @@ class Client(Methods):
self.bot_token = bot_token self.bot_token = bot_token
self.session_string = session_string self.session_string = session_string
self.in_memory = in_memory self.in_memory = in_memory
self.mongodb = mongodb
self.phone_number = phone_number self.phone_number = phone_number
self.phone_code = phone_code self.phone_code = phone_code
self.password = password self.password = password
@ -268,6 +274,8 @@ class Client(Methods):
self.storage = MemoryStorage(self.name, self.session_string) self.storage = MemoryStorage(self.name, self.session_string)
elif self.in_memory: elif self.in_memory:
self.storage = MemoryStorage(self.name) self.storage = MemoryStorage(self.name)
elif self.mongodb:
self.storage = MongoStorage(self.name, **self.mongodb)
else: else:
self.storage = FileStorage(self.name, self.workdir) self.storage = FileStorage(self.name, self.workdir)

View file

@ -18,4 +18,5 @@
from .file_storage import FileStorage from .file_storage import FileStorage
from .memory_storage import MemoryStorage from .memory_storage import MemoryStorage
from .mongo_storage import MongoStorage
from .storage import Storage from .storage import Storage

View file

@ -0,0 +1,44 @@
from pymongo.client_session import TransactionOptions
from bson.codec_options import CodecOptions
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import (
Nearest,
Primary,
PrimaryPreferred,
Secondary,
SecondaryPreferred,
)
from pymongo.write_concern import WriteConcern
from typing import Any, Optional, Union
try:
from typing import Protocol, runtime_checkable
except ImportError:
from typing_extensions import Protocol, runtime_checkable
ReadPreferences = Union[Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest]
@runtime_checkable
class DummyMongoClient(Protocol):
def __init__(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
def get_database(
self,
name: Optional[str] = None,
*,
codec_options: Optional[CodecOptions] = None,
read_preference: Optional[ReadPreferences] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
):
raise NotImplementedError
async def start_session(
self,
*,
causal_consistency: Optional[bool] = None,
default_transaction_options: Optional[TransactionOptions] = None,
snapshot: bool = False,
):
raise NotImplementedError

View file

@ -0,0 +1,188 @@
import asyncio
import inspect
import time
from typing import List, Tuple, Any
from .dummy_client import DummyMongoClient
from pymongo import MongoClient, UpdateOne
from pyrogram.storage.storage import Storage
from pyrogram.storage.sqlite_storage import get_input_peer
class MongoStorage(Storage):
"""
Initializes a new session.
Parameters:
- name (`str`):
The session name used for database name.
- connection (`obj`):
Mongodb connections object.
~async_pymongo.AsyncClient or ~motor.motor_asyncio.AsyncIOMotorClient object
- remove_peers (`bool`, *optional*):
Flag to remove data in the peers collection. If set to True,
the data related to peers will be removed everytime client log out.
If set to False or None, the data will not be removed.
Example:
import async_pymongo
conn = async_pymongo.AsyncClient("mongodb://...")
bot_db = conn["my_bot"]
session = MongoStorage("my_session", connection=conn, remove_peers=True)
"""
lock: asyncio.Lock
USERNAME_TTL = 8 * 60 * 60
def __init__(
self,
name: str,
connection: DummyMongoClient,
remove_peers: bool = False
):
super().__init__(name=name)
database = None
if isinstance(connection, DummyMongoClient):
if isinstance(connection, MongoClient):
raise Exception("Pymongo MongoClient object is not supported! please use async mongodb driver such as async_pymongo and motor.")
database = connection[name]
else:
raise Exception("Wrong connection object type! please pass valid connection object to connection parameter!")
self.lock = asyncio.Lock()
self.database = database
self._peer = database['peers']
self._session = database['session']
self._remove_peers = remove_peers
async def open(self):
"""
dc_id INTEGER PRIMARY KEY,
api_id INTEGER,
test_mode INTEGER,
auth_key BLOB,
date INTEGER NOT NULL,
user_id INTEGER,
is_bot INTEGER
"""
if await self._session.find_one({'_id': 0}, {}):
return
await self._session.insert_one(
{
'_id': 0,
'dc_id': 2,
'api_id': None,
'test_mode': None,
'auth_key': b'',
'date': 0,
'user_id': 0,
'is_bot': 0,
}
)
async def save(self):
pass
async def close(self):
pass
async def delete(self):
try:
await self._session.delete_one({'_id': 0})
if self._remove_peers:
await self._peer.remove({})
except Exception as _:
return
async def update_peers(self, peers: List[Tuple[int, int, str, str, str]]):
"""(id, access_hash, type, username, phone_number)"""
s = int(time.time())
bulk = [
UpdateOne(
{'_id': i[0]},
{'$set': {
'access_hash': i[1],
'type': i[2],
'username': i[3],
'phone_number': i[4],
'last_update_on': s
}},
upsert=True
) for i in peers
]
if not bulk:
return
await self._peer.bulk_write(
bulk
)
async def get_peer_by_id(self, peer_id: int):
# id, access_hash, type
r = await self._peer.find_one({'_id': peer_id}, {'_id': 1, 'access_hash': 1, 'type': 1})
if not r:
raise KeyError(f"ID not found: {peer_id}")
return get_input_peer(r['_id'], r['access_hash'], r['type'])
async def get_peer_by_username(self, username: str):
# id, access_hash, type, last_update_on,
r = await self._peer.find_one({'username': username},
{'_id': 1, 'access_hash': 1, 'type': 1, 'last_update_on': 1})
if r is None:
raise KeyError(f"Username not found: {username}")
if abs(time.time() - r['last_update_on']) > self.USERNAME_TTL:
raise KeyError(f"Username expired: {username}")
return get_input_peer(r['_id'], r['access_hash'], r['type'])
async def get_peer_by_phone_number(self, phone_number: str):
# _id, access_hash, type,
r = await self._peer.find_one({'phone_number': phone_number},
{'_id': 1, 'access_hash': 1, 'type': 1})
if r is None:
raise KeyError(f"Phone number not found: {phone_number}")
return get_input_peer(r['_id'], r['access_hash'], r['type'])
async def _get(self):
attr = inspect.stack()[2].function
d = await self._session.find_one({'_id': 0}, {attr: 1})
if not d:
return
return d[attr]
async def _set(self, value: Any):
attr = inspect.stack()[2].function
await self._session.update_one({'_id': 0}, {'$set': {attr: value}}, upsert=True)
async def _accessor(self, value: Any = object):
return await self._get() if value == object else await self._set(value)
async def dc_id(self, value: int = object):
return await self._accessor(value)
async def api_id(self, value: int = object):
return await self._accessor(value)
async def test_mode(self, value: bool = object):
return await self._accessor(value)
async def auth_key(self, value: bytes = object):
return await self._accessor(value)
async def date(self, value: int = object):
return await self._accessor(value)
async def user_id(self, value: int = object):
return await self._accessor(value)
async def is_bot(self, value: bool = object):
return await self._accessor(value)

View file

@ -1,2 +1,3 @@
pyaes==1.6.1 pyaes==1.6.1
pymongo==4.3.3
pysocks==1.7.1 pysocks==1.7.1