pyrofork: Fix recovering gaps in case of private channel

Signed-off-by: wulan17 <wulan17@nusantararom.org>
This commit is contained in:
KurimuzonAkuma 2024-03-10 20:43:00 +03:00 committed by wulan17
parent 4aa4d1a74a
commit a592c984fa
No known key found for this signature in database
GPG key ID: 318CD6CD3A6AC0A5
4 changed files with 37 additions and 27 deletions

View file

@ -616,7 +616,7 @@ class Client(Methods):
if pts: if pts:
await self.storage.update_state( await self.storage.update_state(
( (
utils.get_channel_id(channel_id) if channel_id else self.me.id, utils.get_channel_id(channel_id) if channel_id else 0,
pts, pts,
None, None,
updates.date, updates.date,
@ -656,7 +656,7 @@ class Client(Methods):
elif isinstance(updates, (raw.types.UpdateShortMessage, raw.types.UpdateShortChatMessage)): elif isinstance(updates, (raw.types.UpdateShortMessage, raw.types.UpdateShortChatMessage)):
await self.storage.update_state( await self.storage.update_state(
( (
self.me.id, 0,
updates.pts, updates.pts,
None, None,
updates.date, updates.date,

View file

@ -23,6 +23,7 @@ import logging
from collections import OrderedDict from collections import OrderedDict
import pyrogram import pyrogram
from pyrogram import errors
from pyrogram import utils from pyrogram import utils
from pyrogram import raw from pyrogram import raw
from pyrogram.handlers import ( from pyrogram.handlers import (
@ -268,26 +269,27 @@ class Dispatcher:
prev_pts = 0 prev_pts = 0
while True: while True:
diff = await self.client.invoke( try:
raw.functions.updates.GetDifference( diff = await self.client.invoke(
pts=local_pts, raw.functions.updates.GetChannelDifference(
date=local_date, channel=await self.client.resolve_peer(id),
qts=0 filter=raw.types.ChannelMessagesFilterEmpty(),
) if id == self.client.me.id else pts=local_pts,
raw.functions.updates.GetChannelDifference( limit=10000
channel=await self.client.resolve_peer(id), ) if id < 0 else
filter=raw.types.ChannelMessagesFilterEmpty(), raw.functions.updates.GetDifference(
pts=local_pts, pts=local_pts,
limit=10000 date=local_date,
qts=0
)
) )
) except (errors.ChannelPrivate, errors.ChannelInvalid):
break
if isinstance(diff, (raw.types.updates.DifferenceEmpty, raw.types.updates.ChannelDifferenceEmpty)): if isinstance(diff, raw.types.updates.DifferenceEmpty):
break break
elif isinstance(diff, (raw.types.updates.DifferenceTooLong, raw.types.updates.ChannelDifferenceTooLong)): elif isinstance(diff, raw.types.updates.DifferenceTooLong):
break break
elif isinstance(diff, raw.types.updates.ChannelDifference):
local_pts = diff.pts
elif isinstance(diff, raw.types.updates.Difference): elif isinstance(diff, raw.types.updates.Difference):
local_pts = diff.state.pts local_pts = diff.state.pts
elif isinstance(diff, raw.types.updates.DifferenceSlice): elif isinstance(diff, raw.types.updates.DifferenceSlice):
@ -298,6 +300,12 @@ class Dispatcher:
break break
prev_pts = local_pts prev_pts = local_pts
elif isinstance(diff, raw.types.updates.ChannelDifferenceEmpty):
break
elif isinstance(diff, raw.types.updates.ChannelDifferenceTooLong):
break
elif isinstance(diff, raw.types.updates.ChannelDifference):
local_pts = diff.pts
users = {i.id: i for i in diff.users} users = {i.id: i for i in diff.users}
chats = {i.id: i for i in diff.chats} chats = {i.id: i for i in diff.chats}
@ -330,7 +338,8 @@ class Dispatcher:
if isinstance(diff, (raw.types.updates.Difference, raw.types.updates.ChannelDifference)): if isinstance(diff, (raw.types.updates.Difference, raw.types.updates.ChannelDifference)):
break break
await self.client.storage.update_state(None) await self.client.storage.update_state(id)
log.info("Recovered %s messages and %s updates.", message_updates_counter, other_updates_counter) log.info("Recovered %s messages and %s updates.", message_updates_counter, other_updates_counter)
async def stop(self): async def stop(self):

View file

@ -173,8 +173,8 @@ class MongoStorage(Storage):
states = [[state['_id'],state['pts'],state['qts'],state['date'],state['seq']] async for state in self._states.find()] states = [[state['_id'],state['pts'],state['qts'],state['date'],state['seq']] async for state in self._states.find()]
return states if len(states) > 0 else None return states if len(states) > 0 else None
else: else:
if value is None: if isinstance(value, int):
await self._states.drop() await self._states.delete_one({'id': value})
else: else:
await self._states.update_one({'_id': value[0]}, {'$set': {'pts': value[1], 'qts': value[2], 'date': value[3], 'seq': value[4]}}, upsert=True) await self._states.update_one({'_id': value[0]}, {'$set': {'pts': value[1], 'qts': value[2], 'date': value[3], 'seq': value[4]}}, upsert=True)

View file

@ -182,16 +182,17 @@ class SQLiteStorage(Storage):
).fetchall() ).fetchall()
else: else:
with self.conn: with self.conn:
if value is None: if isinstance(value, int):
self.conn.execute( self.conn.execute(
"DELETE FROM update_state" "DELETE FROM update_state WHERE id = ?",
(value,)
) )
else: else:
self.conn.execute( self.conn.execute(
"REPLACE INTO update_state (id, pts, qts, date, seq)" "REPLACE INTO update_state (id, pts, qts, date, seq)"
"VALUES (?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?)",
value value
) )
async def get_peer_by_id(self, peer_id: int): async def get_peer_by_id(self, peer_id: int):
r = self.conn.execute( r = self.conn.execute(