Compare commits

..

1 commit

Author SHA1 Message Date
Mahdi K.
be5a22766a Pyrofork: Fix markdown parsing
Note: Still has some issue, like this https://t.me/MayuriChan_Chat/10286
Signed-off-by: Yasir Aris <git@yasir.id>
2025-02-27 04:39:48 +00:00
3 changed files with 173 additions and 157 deletions

View file

@ -32,10 +32,7 @@ class GetForumTopics:
async def get_forum_topics( async def get_forum_topics(
self: "pyrogram.Client", self: "pyrogram.Client",
chat_id: Union[int, str], chat_id: Union[int, str],
limit: int = 0, limit: int = 0
offset_date: int = 0,
offset_id: int = 0,
offset_topic: int = 0
) -> Optional[AsyncGenerator["types.ForumTopic", None]]: ) -> Optional[AsyncGenerator["types.ForumTopic", None]]:
"""Get one or more topic from a chat. """Get one or more topic from a chat.
@ -49,15 +46,6 @@ class GetForumTopics:
limit (``int``, *optional*): limit (``int``, *optional*):
Limits the number of topics to be retrieved. Limits the number of topics to be retrieved.
offset_date (``int``, *optional*):
Date of the last message of the last found topic.
offset_id (``int``, *optional*):
ID of the last message of the last found topic.
offset_topic (``int``, *optional*):
ID of the last found topic.
Returns: Returns:
``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned. ``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned.
@ -74,7 +62,7 @@ class GetForumTopics:
peer = await self.resolve_peer(chat_id) peer = await self.resolve_peer(chat_id)
rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=offset_date, offset_id=offset_id, offset_topic=offset_topic, limit=limit) rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=0, offset_id=0, offset_topic=0, limit=limit)
r = await self.invoke(rpc, sleep_threshold=-1) r = await self.invoke(rpc, sleep_threshold=-1)

View file

@ -18,12 +18,12 @@
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>. # along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
import html import html
import logging
import re import re
from typing import Optional from typing import Optional
import pyrogram import pyrogram
from pyrogram.enums import MessageEntityType from pyrogram.enums import MessageEntityType
from . import utils from . import utils
from .html import HTML from .html import HTML
@ -36,8 +36,9 @@ CODE_DELIM = "`"
PRE_DELIM = "```" PRE_DELIM = "```"
BLOCKQUOTE_DELIM = ">" BLOCKQUOTE_DELIM = ">"
BLOCKQUOTE_EXPANDABLE_DELIM = "**>" BLOCKQUOTE_EXPANDABLE_DELIM = "**>"
BLOCKQUOTE_EXPANDABLE_END_DELIM = "||"
MARKDOWN_RE = re.compile(r"({d})".format( MARKDOWN_RE = re.compile(r"({d})|(!?)\[(.+?)\]\((.+?)\)".format(
d="|".join( d="|".join(
["".join(i) for i in [ ["".join(i) for i in [
[rf"\{j}" for j in i] [rf"\{j}" for j in i]
@ -52,74 +53,123 @@ MARKDOWN_RE = re.compile(r"({d})".format(
] ]
]] ]]
))) )))
URL_RE = re.compile(r"(!?)\[(.+?)\]\((.+?)\)")
OPENING_TAG = "<{}>" OPENING_TAG = "<{}>"
CLOSING_TAG = "</{}>" CLOSING_TAG = "</{}>"
URL_MARKUP = '<a href="{}">{}</a>' URL_MARKUP = '<a href="{}">{}</a>'
EMOJI_MARKUP = '<emoji id={}>{}</emoji>' EMOJI_MARKUP = "<emoji id={}>{}</emoji>"
FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM] FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM]
CODE_TAG_RE = re.compile(r"<code>.*?</code>")
class Markdown: class Markdown:
def __init__(self, client: Optional["pyrogram.Client"]): def __init__(self, client: Optional["pyrogram.Client"]):
self.html = HTML(client) self.html = HTML(client)
def blockquote_parser(self, text): @staticmethod
text = re.sub(r'\n&gt;', '\n>', re.sub(r'^&gt;', '>', text)) def escape_and_create_quotes(text: str, strict: bool):
lines = text.split('\n') text_lines: list[str | None] = text.splitlines()
result = []
in_blockquote = False # Indexes of Already escaped lines
html_escaped_list: list[int] = []
# Temporary Queue to hold lines to be quoted
to_quote_list: list[tuple[int, str]] = []
def create_blockquote(expandable: bool = False) -> None:
"""
Merges all lines in quote_queue into first line of queue
Encloses that line in html quote
Replaces rest of the lines with None placeholders to preserve indexes
"""
if len(to_quote_list) == 0:
return
joined_lines = "\n".join([i[1] for i in to_quote_list])
first_line_index, _ = to_quote_list[0]
text_lines[first_line_index] = (
f"<blockquote{' expandable' if expandable else ''}>{joined_lines}</blockquote>"
)
for line_to_remove in to_quote_list[1:]:
text_lines[line_to_remove[0]] = None
to_quote_list.clear()
# Handle Expandable Quote
inside_blockquote = False
for index, line in enumerate(text_lines):
if line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM) and not inside_blockquote:
delim_stripped_line = line[len(BLOCKQUOTE_EXPANDABLE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_EXPANDABLE_DELIM} ") else 0) :]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
inside_blockquote = True
continue
elif line.endswith(BLOCKQUOTE_EXPANDABLE_END_DELIM) and inside_blockquote:
if line.startswith(BLOCKQUOTE_DELIM):
line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
delim_stripped_line = line[:-len(BLOCKQUOTE_EXPANDABLE_END_DELIM)]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
inside_blockquote = False
create_blockquote(expandable=True)
if inside_blockquote:
parsed_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
parsed_line = html.escape(parsed_line) if strict else parsed_line
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
# Handle Single line/Continued Quote
for index, line in enumerate(text_lines):
if line is None:
continue
for line in lines:
if line.startswith(BLOCKQUOTE_DELIM): if line.startswith(BLOCKQUOTE_DELIM):
if not in_blockquote: delim_stripped_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
line = re.sub(r'^> ', OPENING_TAG.format("blockquote"), line) parsed_line = (
line = re.sub(r'^>', OPENING_TAG.format("blockquote"), line) html.escape(delim_stripped_line) if strict else delim_stripped_line
in_blockquote = True )
result.append(line.strip())
else:
result.append(line[1:].strip())
elif line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM):
if not in_blockquote:
line = re.sub(r'^\*\*> ', OPENING_TAG.format("blockquote expandable"), line)
line = re.sub(r'^\*\*>', OPENING_TAG.format("blockquote expandable"), line)
in_blockquote = True
result.append(line.strip())
else:
result.append(line[3:].strip())
else:
if in_blockquote:
line = CLOSING_TAG.format("blockquote") + line
in_blockquote = False
result.append(line)
if in_blockquote: to_quote_list.append((index, parsed_line))
line = result[len(result)-1] + CLOSING_TAG.format("blockquote") html_escaped_list.append(index)
result.pop(len(result)-1)
result.append(line)
return '\n'.join(result) elif len(to_quote_list) > 0:
create_blockquote()
else:
create_blockquote()
if strict:
for idx, line in enumerate(text_lines):
if idx not in html_escaped_list:
text_lines[idx] = html.escape(line)
return "\n".join(
[valid_line for valid_line in text_lines if valid_line is not None]
)
async def parse(self, text: str, strict: bool = False): async def parse(self, text: str, strict: bool = False):
if strict: text = self.escape_and_create_quotes(text, strict=strict)
text = html.escape(text)
text = self.blockquote_parser(text)
delims = set() delims = set()
is_fixed_width = False is_fixed_width = False
placeholders = {}
for i, code_section in enumerate(CODE_TAG_RE.findall(text)):
placeholder = f"{{CODE_SECTION_{i}}}"
placeholders[placeholder] = code_section
text = text.replace(code_section, placeholder, 1)
for i, match in enumerate(re.finditer(MARKDOWN_RE, text)): for i, match in enumerate(re.finditer(MARKDOWN_RE, text)):
start, _ = match.span() start, _ = match.span()
delim = match.group(1) delim, is_emoji, text_url, url = match.groups()
full = match.group(0) full = match.group(0)
if delim in FIXED_WIDTH_DELIMS: if delim in FIXED_WIDTH_DELIMS:
@ -128,6 +178,16 @@ class Markdown:
if is_fixed_width and delim not in FIXED_WIDTH_DELIMS: if is_fixed_width and delim not in FIXED_WIDTH_DELIMS:
continue continue
if not is_emoji and text_url:
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
continue
if is_emoji:
emoji = text_url
emoji_id = url.lstrip("tg://emoji?id=")
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
continue
if delim == BOLD_DELIM: if delim == BOLD_DELIM:
tag = "b" tag = "b"
elif delim == ITALIC_DELIM: elif delim == ITALIC_DELIM:
@ -160,98 +220,79 @@ class Markdown:
text = utils.replace_once(text, delim, tag, start) text = utils.replace_once(text, delim, tag, start)
for i, match in enumerate(re.finditer(URL_RE, text)):
start, _ = match.span()
is_emoji, text_url, url = match.groups()
full = match.group(0)
if not is_emoji and text_url:
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
continue
if is_emoji:
emoji = text_url
emoji_id = url.lstrip("tg://emoji?id=")
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
continue
for placeholder, code_section in placeholders.items():
text = text.replace(placeholder, code_section)
return await self.html.parse(text) return await self.html.parse(text)
@staticmethod @staticmethod
def unparse(text: str, entities: list): def unparse(text: str, entities: list):
"""
Performs the reverse operation to .parse(), effectively returning
markdown-like syntax given a normal text and its MessageEntity's.
:param text: the text to be reconverted into markdown.
:param entities: list of MessageEntity's applied to the text.
:return: a markdown-like text representing the combination of both inputs.
"""
delimiters = {
MessageEntityType.BOLD: BOLD_DELIM,
MessageEntityType.ITALIC: ITALIC_DELIM,
MessageEntityType.UNDERLINE: UNDERLINE_DELIM,
MessageEntityType.STRIKETHROUGH: STRIKE_DELIM,
MessageEntityType.CODE: CODE_DELIM,
MessageEntityType.PRE: PRE_DELIM,
MessageEntityType.BLOCKQUOTE: BLOCKQUOTE_DELIM,
MessageEntityType.EXPANDABLE_BLOCKQUOTE: BLOCKQUOTE_EXPANDABLE_DELIM,
MessageEntityType.SPOILER: SPOILER_DELIM
}
text = utils.add_surrogates(text) text = utils.add_surrogates(text)
insert_at = [] entities_offsets = []
for i, entity in enumerate(entities):
s = entity.offset for entity in entities:
e = entity.offset + entity.length entity_type = entity.type
delimiter = delimiters.get(entity.type, None) start = entity.offset
if delimiter: end = start + entity.length
if entity.type != MessageEntityType.BLOCKQUOTE and entity.type != MessageEntityType.EXPANDABLE_BLOCKQUOTE:
open_delimiter = delimiter if entity_type == MessageEntityType.BOLD:
close_delimiter = delimiter start_tag = end_tag = BOLD_DELIM
if entity.type == MessageEntityType.PRE: elif entity_type == MessageEntityType.ITALIC:
if entity.language: start_tag = end_tag = ITALIC_DELIM
open_delimiter += entity.language + '\n' elif entity_type == MessageEntityType.UNDERLINE:
else: start_tag = end_tag = UNDERLINE_DELIM
open_delimiter += entity + '\n' elif entity_type == MessageEntityType.STRIKETHROUGH:
insert_at.append((s, i, open_delimiter)) start_tag = end_tag = STRIKE_DELIM
insert_at.append((e, -i, close_delimiter)) elif entity_type == MessageEntityType.CODE:
else: start_tag = end_tag = CODE_DELIM
# Handle multiline blockquotes elif entity_type == MessageEntityType.PRE:
text_subset = text[s:e] language = getattr(entity, "language", "") or ""
lines = text_subset.splitlines() start_tag = f"{PRE_DELIM}{language}\n"
for line_num, line in enumerate(lines): end_tag = f"\n{PRE_DELIM}"
line_start = s + sum(len(l) + 1 for l in lines[:line_num]) elif entity_type == MessageEntityType.BLOCKQUOTE:
if entity.collapsed: start_tag = BLOCKQUOTE_DELIM + " "
insert_at.append((line_start, i, BLOCKQUOTE_EXPANDABLE_DELIM)) end_tag = ""
else: blockquote_text = text[start:end]
insert_at.append((line_start, i, BLOCKQUOTE_DELIM)) lines = blockquote_text.split("\n")
# No closing delimiter for blockquotes last_length = 0
for line in lines:
if len(line) == 0 and last_length == end:
continue
start_offset = start+last_length
last_length = last_length+len(line)
end_offset = start_offset+last_length
entities_offsets.append((start_tag, start_offset,))
entities_offsets.append((end_tag, end_offset,))
last_length = last_length+1
continue
elif entity_type == MessageEntityType.SPOILER:
start_tag = end_tag = SPOILER_DELIM
elif entity_type == MessageEntityType.TEXT_LINK:
url = entity.url
start_tag = "["
end_tag = f"]({url})"
elif entity_type == MessageEntityType.TEXT_MENTION:
user = entity.user
start_tag = "["
end_tag = f"](tg://user?id={user.id})"
elif entity_type == MessageEntityType.CUSTOM_EMOJI:
emoji_id = entity.custom_emoji_id
start_tag = "!["
end_tag = f"](tg://emoji?id={emoji_id})"
else: else:
url = None continue
if entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
elif entity.type == MessageEntityType.TEXT_MENTION:
url = 'tg://user?id={}'.format(entity.user.id)
if url:
insert_at.append((s, i, '['))
insert_at.append((e, -i, ']({})'.format(url)))
insert_at.sort(key=lambda t: (t[0], t[1])) entities_offsets.append((start_tag, start,))
while insert_at: entities_offsets.append((end_tag, end,))
at, _, what = insert_at.pop()
# If we are in the middle of a surrogate nudge the position by -1. entities_offsets = map(
# Otherwise we would end up with malformed text and fail to encode. lambda x: x[1],
# For example of bad input: "Hi \ud83d\ude1c" sorted(
# https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF enumerate(entities_offsets),
while utils.within_surrogate(text, at): key=lambda x: (x[1][1], x[0]),
at += 1 reverse=True
)
)
text = text[:at] + what + text[at:] for entity, offset in entities_offsets:
text = text[:offset] + entity + text[offset:]
return utils.remove_surrogates(text) return utils.remove_surrogates(text)

View file

@ -40,16 +40,3 @@ def remove_surrogates(text):
def replace_once(source: str, old: str, new: str, start: int): def replace_once(source: str, old: str, new: str, start: int):
return source[:start] + source[start:].replace(old, new, 1) return source[:start] + source[start:].replace(old, new, 1)
def within_surrogate(text, index, *, length=None):
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
if length is None:
length = len(text)
return (
1 < index < len(text) and # in bounds
'\ud800' <= text[index - 1] <= '\udbff' and # previous is
'\ud800' <= text[index] <= '\udfff' # current is
)