Compare commits

..

4 commits

Author SHA1 Message Date
wulan17
4d1ae0b00f
pyrofork: Add offset_{date,id,topic} parameters to get_forum_topics method
Some checks are pending
Build-docs / build (push) Waiting to run
Pyrofork / build (macos-latest, 3.10) (push) Waiting to run
Pyrofork / build (macos-latest, 3.11) (push) Waiting to run
Pyrofork / build (macos-latest, 3.12) (push) Waiting to run
Pyrofork / build (macos-latest, 3.13) (push) Waiting to run
Pyrofork / build (macos-latest, 3.9) (push) Waiting to run
Pyrofork / build (ubuntu-latest, 3.10) (push) Waiting to run
Pyrofork / build (ubuntu-latest, 3.11) (push) Waiting to run
Pyrofork / build (ubuntu-latest, 3.12) (push) Waiting to run
Pyrofork / build (ubuntu-latest, 3.13) (push) Waiting to run
Pyrofork / build (ubuntu-latest, 3.9) (push) Waiting to run
Signed-off-by: wulan17 <wulan17@nusantararom.org>
2025-03-03 00:32:37 +07:00
wulan17
943a7e0342
pyrofork: Add support for multi-line blockquote in markdown unparser
Signed-off-by: wulan17 <wulan17@nusantararom.org>
2025-03-03 00:32:37 +07:00
wulan17
4a5af71d25
pyrofork: Adapt markdown unparser from telethon
* The problem with current implementation is when we have nested markdown inside a url the markdown order is messed up.
for example link with bold text will be unparsed like this [**github](https://github.com**).

Signed-off-by: wulan17 <wulan17@nusantararom.org>
2025-03-03 00:31:57 +07:00
wulan17
984abd2008
pyrofork: Fix nested url markdown parsing
* The problem with current implepementation is when we add another markdown inside an url markdown will not be parsed.
for example we add bold (**) markdown inside an url markdown, the url text show as `**text**` instead of making the text bold.

Signed-off-by: wulan17 <wulan17@nusantararom.org>
2025-03-03 00:01:20 +07:00
3 changed files with 157 additions and 173 deletions

View file

@ -32,7 +32,10 @@ class GetForumTopics:
async def get_forum_topics( async def get_forum_topics(
self: "pyrogram.Client", self: "pyrogram.Client",
chat_id: Union[int, str], chat_id: Union[int, str],
limit: int = 0 limit: int = 0,
offset_date: int = 0,
offset_id: int = 0,
offset_topic: int = 0
) -> Optional[AsyncGenerator["types.ForumTopic", None]]: ) -> Optional[AsyncGenerator["types.ForumTopic", None]]:
"""Get one or more topic from a chat. """Get one or more topic from a chat.
@ -46,6 +49,15 @@ class GetForumTopics:
limit (``int``, *optional*): limit (``int``, *optional*):
Limits the number of topics to be retrieved. Limits the number of topics to be retrieved.
offset_date (``int``, *optional*):
Date of the last message of the last found topic.
offset_id (``int``, *optional*):
ID of the last message of the last found topic.
offset_topic (``int``, *optional*):
ID of the last found topic.
Returns: Returns:
``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned. ``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned.
@ -62,7 +74,7 @@ class GetForumTopics:
peer = await self.resolve_peer(chat_id) peer = await self.resolve_peer(chat_id)
rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=0, offset_id=0, offset_topic=0, limit=limit) rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=offset_date, offset_id=offset_id, offset_topic=offset_topic, limit=limit)
r = await self.invoke(rpc, sleep_threshold=-1) r = await self.invoke(rpc, sleep_threshold=-1)

View file

@ -18,12 +18,12 @@
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>. # along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
import html import html
import logging
import re import re
from typing import Optional from typing import Optional
import pyrogram import pyrogram
from pyrogram.enums import MessageEntityType from pyrogram.enums import MessageEntityType
from . import utils from . import utils
from .html import HTML from .html import HTML
@ -36,9 +36,8 @@ CODE_DELIM = "`"
PRE_DELIM = "```" PRE_DELIM = "```"
BLOCKQUOTE_DELIM = ">" BLOCKQUOTE_DELIM = ">"
BLOCKQUOTE_EXPANDABLE_DELIM = "**>" BLOCKQUOTE_EXPANDABLE_DELIM = "**>"
BLOCKQUOTE_EXPANDABLE_END_DELIM = "||"
MARKDOWN_RE = re.compile(r"({d})|(!?)\[(.+?)\]\((.+?)\)".format( MARKDOWN_RE = re.compile(r"({d})".format(
d="|".join( d="|".join(
["".join(i) for i in [ ["".join(i) for i in [
[rf"\{j}" for j in i] [rf"\{j}" for j in i]
@ -53,123 +52,74 @@ MARKDOWN_RE = re.compile(r"({d})|(!?)\[(.+?)\]\((.+?)\)".format(
] ]
]] ]]
))) )))
URL_RE = re.compile(r"(!?)\[(.+?)\]\((.+?)\)")
OPENING_TAG = "<{}>" OPENING_TAG = "<{}>"
CLOSING_TAG = "</{}>" CLOSING_TAG = "</{}>"
URL_MARKUP = '<a href="{}">{}</a>' URL_MARKUP = '<a href="{}">{}</a>'
EMOJI_MARKUP = "<emoji id={}>{}</emoji>" EMOJI_MARKUP = '<emoji id={}>{}</emoji>'
FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM] FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM]
CODE_TAG_RE = re.compile(r"<code>.*?</code>")
class Markdown: class Markdown:
def __init__(self, client: Optional["pyrogram.Client"]): def __init__(self, client: Optional["pyrogram.Client"]):
self.html = HTML(client) self.html = HTML(client)
@staticmethod def blockquote_parser(self, text):
def escape_and_create_quotes(text: str, strict: bool): text = re.sub(r'\n&gt;', '\n>', re.sub(r'^&gt;', '>', text))
text_lines: list[str | None] = text.splitlines() lines = text.split('\n')
result = []
# Indexes of Already escaped lines in_blockquote = False
html_escaped_list: list[int] = []
# Temporary Queue to hold lines to be quoted
to_quote_list: list[tuple[int, str]] = []
def create_blockquote(expandable: bool = False) -> None:
"""
Merges all lines in quote_queue into first line of queue
Encloses that line in html quote
Replaces rest of the lines with None placeholders to preserve indexes
"""
if len(to_quote_list) == 0:
return
joined_lines = "\n".join([i[1] for i in to_quote_list])
first_line_index, _ = to_quote_list[0]
text_lines[first_line_index] = (
f"<blockquote{' expandable' if expandable else ''}>{joined_lines}</blockquote>"
)
for line_to_remove in to_quote_list[1:]:
text_lines[line_to_remove[0]] = None
to_quote_list.clear()
# Handle Expandable Quote
inside_blockquote = False
for index, line in enumerate(text_lines):
if line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM) and not inside_blockquote:
delim_stripped_line = line[len(BLOCKQUOTE_EXPANDABLE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_EXPANDABLE_DELIM} ") else 0) :]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
inside_blockquote = True
continue
elif line.endswith(BLOCKQUOTE_EXPANDABLE_END_DELIM) and inside_blockquote:
if line.startswith(BLOCKQUOTE_DELIM):
line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
delim_stripped_line = line[:-len(BLOCKQUOTE_EXPANDABLE_END_DELIM)]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
inside_blockquote = False
create_blockquote(expandable=True)
if inside_blockquote:
parsed_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
parsed_line = html.escape(parsed_line) if strict else parsed_line
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)
# Handle Single line/Continued Quote
for index, line in enumerate(text_lines):
if line is None:
continue
for line in lines:
if line.startswith(BLOCKQUOTE_DELIM): if line.startswith(BLOCKQUOTE_DELIM):
delim_stripped_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :] if not in_blockquote:
parsed_line = ( line = re.sub(r'^> ', OPENING_TAG.format("blockquote"), line)
html.escape(delim_stripped_line) if strict else delim_stripped_line line = re.sub(r'^>', OPENING_TAG.format("blockquote"), line)
) in_blockquote = True
result.append(line.strip())
else:
result.append(line[1:].strip())
elif line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM):
if not in_blockquote:
line = re.sub(r'^\*\*> ', OPENING_TAG.format("blockquote expandable"), line)
line = re.sub(r'^\*\*>', OPENING_TAG.format("blockquote expandable"), line)
in_blockquote = True
result.append(line.strip())
else:
result.append(line[3:].strip())
else:
if in_blockquote:
line = CLOSING_TAG.format("blockquote") + line
in_blockquote = False
result.append(line)
to_quote_list.append((index, parsed_line)) if in_blockquote:
html_escaped_list.append(index) line = result[len(result)-1] + CLOSING_TAG.format("blockquote")
result.pop(len(result)-1)
result.append(line)
elif len(to_quote_list) > 0: return '\n'.join(result)
create_blockquote()
else:
create_blockquote()
if strict:
for idx, line in enumerate(text_lines):
if idx not in html_escaped_list:
text_lines[idx] = html.escape(line)
return "\n".join(
[valid_line for valid_line in text_lines if valid_line is not None]
)
async def parse(self, text: str, strict: bool = False): async def parse(self, text: str, strict: bool = False):
text = self.escape_and_create_quotes(text, strict=strict) if strict:
text = html.escape(text)
text = self.blockquote_parser(text)
delims = set() delims = set()
is_fixed_width = False is_fixed_width = False
placeholders = {}
for i, code_section in enumerate(CODE_TAG_RE.findall(text)):
placeholder = f"{{CODE_SECTION_{i}}}"
placeholders[placeholder] = code_section
text = text.replace(code_section, placeholder, 1)
for i, match in enumerate(re.finditer(MARKDOWN_RE, text)): for i, match in enumerate(re.finditer(MARKDOWN_RE, text)):
start, _ = match.span() start, _ = match.span()
delim, is_emoji, text_url, url = match.groups() delim = match.group(1)
full = match.group(0) full = match.group(0)
if delim in FIXED_WIDTH_DELIMS: if delim in FIXED_WIDTH_DELIMS:
@ -178,16 +128,6 @@ class Markdown:
if is_fixed_width and delim not in FIXED_WIDTH_DELIMS: if is_fixed_width and delim not in FIXED_WIDTH_DELIMS:
continue continue
if not is_emoji and text_url:
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
continue
if is_emoji:
emoji = text_url
emoji_id = url.lstrip("tg://emoji?id=")
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
continue
if delim == BOLD_DELIM: if delim == BOLD_DELIM:
tag = "b" tag = "b"
elif delim == ITALIC_DELIM: elif delim == ITALIC_DELIM:
@ -220,79 +160,98 @@ class Markdown:
text = utils.replace_once(text, delim, tag, start) text = utils.replace_once(text, delim, tag, start)
for i, match in enumerate(re.finditer(URL_RE, text)):
start, _ = match.span()
is_emoji, text_url, url = match.groups()
full = match.group(0)
if not is_emoji and text_url:
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
continue
if is_emoji:
emoji = text_url
emoji_id = url.lstrip("tg://emoji?id=")
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
continue
for placeholder, code_section in placeholders.items():
text = text.replace(placeholder, code_section)
return await self.html.parse(text) return await self.html.parse(text)
@staticmethod @staticmethod
def unparse(text: str, entities: list): def unparse(text: str, entities: list):
"""
Performs the reverse operation to .parse(), effectively returning
markdown-like syntax given a normal text and its MessageEntity's.
:param text: the text to be reconverted into markdown.
:param entities: list of MessageEntity's applied to the text.
:return: a markdown-like text representing the combination of both inputs.
"""
delimiters = {
MessageEntityType.BOLD: BOLD_DELIM,
MessageEntityType.ITALIC: ITALIC_DELIM,
MessageEntityType.UNDERLINE: UNDERLINE_DELIM,
MessageEntityType.STRIKETHROUGH: STRIKE_DELIM,
MessageEntityType.CODE: CODE_DELIM,
MessageEntityType.PRE: PRE_DELIM,
MessageEntityType.BLOCKQUOTE: BLOCKQUOTE_DELIM,
MessageEntityType.EXPANDABLE_BLOCKQUOTE: BLOCKQUOTE_EXPANDABLE_DELIM,
MessageEntityType.SPOILER: SPOILER_DELIM
}
text = utils.add_surrogates(text) text = utils.add_surrogates(text)
entities_offsets = [] insert_at = []
for i, entity in enumerate(entities):
for entity in entities: s = entity.offset
entity_type = entity.type e = entity.offset + entity.length
start = entity.offset delimiter = delimiters.get(entity.type, None)
end = start + entity.length if delimiter:
if entity.type != MessageEntityType.BLOCKQUOTE and entity.type != MessageEntityType.EXPANDABLE_BLOCKQUOTE:
if entity_type == MessageEntityType.BOLD: open_delimiter = delimiter
start_tag = end_tag = BOLD_DELIM close_delimiter = delimiter
elif entity_type == MessageEntityType.ITALIC: if entity.type == MessageEntityType.PRE:
start_tag = end_tag = ITALIC_DELIM if entity.language:
elif entity_type == MessageEntityType.UNDERLINE: open_delimiter += entity.language + '\n'
start_tag = end_tag = UNDERLINE_DELIM else:
elif entity_type == MessageEntityType.STRIKETHROUGH: open_delimiter += entity + '\n'
start_tag = end_tag = STRIKE_DELIM insert_at.append((s, i, open_delimiter))
elif entity_type == MessageEntityType.CODE: insert_at.append((e, -i, close_delimiter))
start_tag = end_tag = CODE_DELIM else:
elif entity_type == MessageEntityType.PRE: # Handle multiline blockquotes
language = getattr(entity, "language", "") or "" text_subset = text[s:e]
start_tag = f"{PRE_DELIM}{language}\n" lines = text_subset.splitlines()
end_tag = f"\n{PRE_DELIM}" for line_num, line in enumerate(lines):
elif entity_type == MessageEntityType.BLOCKQUOTE: line_start = s + sum(len(l) + 1 for l in lines[:line_num])
start_tag = BLOCKQUOTE_DELIM + " " if entity.collapsed:
end_tag = "" insert_at.append((line_start, i, BLOCKQUOTE_EXPANDABLE_DELIM))
blockquote_text = text[start:end] else:
lines = blockquote_text.split("\n") insert_at.append((line_start, i, BLOCKQUOTE_DELIM))
last_length = 0 # No closing delimiter for blockquotes
for line in lines:
if len(line) == 0 and last_length == end:
continue
start_offset = start+last_length
last_length = last_length+len(line)
end_offset = start_offset+last_length
entities_offsets.append((start_tag, start_offset,))
entities_offsets.append((end_tag, end_offset,))
last_length = last_length+1
continue
elif entity_type == MessageEntityType.SPOILER:
start_tag = end_tag = SPOILER_DELIM
elif entity_type == MessageEntityType.TEXT_LINK:
url = entity.url
start_tag = "["
end_tag = f"]({url})"
elif entity_type == MessageEntityType.TEXT_MENTION:
user = entity.user
start_tag = "["
end_tag = f"](tg://user?id={user.id})"
elif entity_type == MessageEntityType.CUSTOM_EMOJI:
emoji_id = entity.custom_emoji_id
start_tag = "!["
end_tag = f"](tg://emoji?id={emoji_id})"
else: else:
continue url = None
if entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
elif entity.type == MessageEntityType.TEXT_MENTION:
url = 'tg://user?id={}'.format(entity.user.id)
if url:
insert_at.append((s, i, '['))
insert_at.append((e, -i, ']({})'.format(url)))
entities_offsets.append((start_tag, start,)) insert_at.sort(key=lambda t: (t[0], t[1]))
entities_offsets.append((end_tag, end,)) while insert_at:
at, _, what = insert_at.pop()
entities_offsets = map( # If we are in the middle of a surrogate nudge the position by -1.
lambda x: x[1], # Otherwise we would end up with malformed text and fail to encode.
sorted( # For example of bad input: "Hi \ud83d\ude1c"
enumerate(entities_offsets), # https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF
key=lambda x: (x[1][1], x[0]), while utils.within_surrogate(text, at):
reverse=True at += 1
)
)
for entity, offset in entities_offsets: text = text[:at] + what + text[at:]
text = text[:offset] + entity + text[offset:]
return utils.remove_surrogates(text) return utils.remove_surrogates(text)

View file

@ -40,3 +40,16 @@ def remove_surrogates(text):
def replace_once(source: str, old: str, new: str, start: int): def replace_once(source: str, old: str, new: str, start: int):
return source[:start] + source[start:].replace(old, new, 1) return source[:start] + source[start:].replace(old, new, 1)
def within_surrogate(text, index, *, length=None):
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
if length is None:
length = len(text)
return (
1 < index < len(text) and # in bounds
'\ud800' <= text[index - 1] <= '\udbff' and # previous is
'\ud800' <= text[index] <= '\udfff' # current is
)