mirror of
https://github.com/Mayuri-Chan/pyrofork.git
synced 2026-01-07 15:34:52 +00:00
Compare commits
1 commit
4d1ae0b00f
...
be5a22766a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be5a22766a |
3 changed files with 173 additions and 157 deletions
|
|
@ -32,10 +32,7 @@ class GetForumTopics:
|
||||||
async def get_forum_topics(
|
async def get_forum_topics(
|
||||||
self: "pyrogram.Client",
|
self: "pyrogram.Client",
|
||||||
chat_id: Union[int, str],
|
chat_id: Union[int, str],
|
||||||
limit: int = 0,
|
limit: int = 0
|
||||||
offset_date: int = 0,
|
|
||||||
offset_id: int = 0,
|
|
||||||
offset_topic: int = 0
|
|
||||||
) -> Optional[AsyncGenerator["types.ForumTopic", None]]:
|
) -> Optional[AsyncGenerator["types.ForumTopic", None]]:
|
||||||
"""Get one or more topic from a chat.
|
"""Get one or more topic from a chat.
|
||||||
|
|
||||||
|
|
@ -49,15 +46,6 @@ class GetForumTopics:
|
||||||
limit (``int``, *optional*):
|
limit (``int``, *optional*):
|
||||||
Limits the number of topics to be retrieved.
|
Limits the number of topics to be retrieved.
|
||||||
|
|
||||||
offset_date (``int``, *optional*):
|
|
||||||
Date of the last message of the last found topic.
|
|
||||||
|
|
||||||
offset_id (``int``, *optional*):
|
|
||||||
ID of the last message of the last found topic.
|
|
||||||
|
|
||||||
offset_topic (``int``, *optional*):
|
|
||||||
ID of the last found topic.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned.
|
``Generator``: On success, a generator yielding :obj:`~pyrogram.types.ForumTopic` objects is returned.
|
||||||
|
|
||||||
|
|
@ -74,7 +62,7 @@ class GetForumTopics:
|
||||||
|
|
||||||
peer = await self.resolve_peer(chat_id)
|
peer = await self.resolve_peer(chat_id)
|
||||||
|
|
||||||
rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=offset_date, offset_id=offset_id, offset_topic=offset_topic, limit=limit)
|
rpc = raw.functions.channels.GetForumTopics(channel=peer, offset_date=0, offset_id=0, offset_topic=0, limit=limit)
|
||||||
|
|
||||||
r = await self.invoke(rpc, sleep_threshold=-1)
|
r = await self.invoke(rpc, sleep_threshold=-1)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,12 @@
|
||||||
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
|
# along with Pyrofork. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import html
|
import html
|
||||||
import logging
|
|
||||||
import re
|
import re
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import pyrogram
|
import pyrogram
|
||||||
from pyrogram.enums import MessageEntityType
|
from pyrogram.enums import MessageEntityType
|
||||||
|
|
||||||
from . import utils
|
from . import utils
|
||||||
from .html import HTML
|
from .html import HTML
|
||||||
|
|
||||||
|
|
@ -36,8 +36,9 @@ CODE_DELIM = "`"
|
||||||
PRE_DELIM = "```"
|
PRE_DELIM = "```"
|
||||||
BLOCKQUOTE_DELIM = ">"
|
BLOCKQUOTE_DELIM = ">"
|
||||||
BLOCKQUOTE_EXPANDABLE_DELIM = "**>"
|
BLOCKQUOTE_EXPANDABLE_DELIM = "**>"
|
||||||
|
BLOCKQUOTE_EXPANDABLE_END_DELIM = "||"
|
||||||
|
|
||||||
MARKDOWN_RE = re.compile(r"({d})".format(
|
MARKDOWN_RE = re.compile(r"({d})|(!?)\[(.+?)\]\((.+?)\)".format(
|
||||||
d="|".join(
|
d="|".join(
|
||||||
["".join(i) for i in [
|
["".join(i) for i in [
|
||||||
[rf"\{j}" for j in i]
|
[rf"\{j}" for j in i]
|
||||||
|
|
@ -52,74 +53,123 @@ MARKDOWN_RE = re.compile(r"({d})".format(
|
||||||
]
|
]
|
||||||
]]
|
]]
|
||||||
)))
|
)))
|
||||||
URL_RE = re.compile(r"(!?)\[(.+?)\]\((.+?)\)")
|
|
||||||
|
|
||||||
OPENING_TAG = "<{}>"
|
OPENING_TAG = "<{}>"
|
||||||
CLOSING_TAG = "</{}>"
|
CLOSING_TAG = "</{}>"
|
||||||
URL_MARKUP = '<a href="{}">{}</a>'
|
URL_MARKUP = '<a href="{}">{}</a>'
|
||||||
EMOJI_MARKUP = '<emoji id={}>{}</emoji>'
|
EMOJI_MARKUP = "<emoji id={}>{}</emoji>"
|
||||||
FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM]
|
FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM]
|
||||||
CODE_TAG_RE = re.compile(r"<code>.*?</code>")
|
|
||||||
|
|
||||||
|
|
||||||
class Markdown:
|
class Markdown:
|
||||||
def __init__(self, client: Optional["pyrogram.Client"]):
|
def __init__(self, client: Optional["pyrogram.Client"]):
|
||||||
self.html = HTML(client)
|
self.html = HTML(client)
|
||||||
|
|
||||||
def blockquote_parser(self, text):
|
@staticmethod
|
||||||
text = re.sub(r'\n>', '\n>', re.sub(r'^>', '>', text))
|
def escape_and_create_quotes(text: str, strict: bool):
|
||||||
lines = text.split('\n')
|
text_lines: list[str | None] = text.splitlines()
|
||||||
result = []
|
|
||||||
|
|
||||||
in_blockquote = False
|
# Indexes of Already escaped lines
|
||||||
|
html_escaped_list: list[int] = []
|
||||||
|
|
||||||
|
# Temporary Queue to hold lines to be quoted
|
||||||
|
to_quote_list: list[tuple[int, str]] = []
|
||||||
|
|
||||||
|
def create_blockquote(expandable: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Merges all lines in quote_queue into first line of queue
|
||||||
|
Encloses that line in html quote
|
||||||
|
Replaces rest of the lines with None placeholders to preserve indexes
|
||||||
|
"""
|
||||||
|
if len(to_quote_list) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
joined_lines = "\n".join([i[1] for i in to_quote_list])
|
||||||
|
|
||||||
|
first_line_index, _ = to_quote_list[0]
|
||||||
|
text_lines[first_line_index] = (
|
||||||
|
f"<blockquote{' expandable' if expandable else ''}>{joined_lines}</blockquote>"
|
||||||
|
)
|
||||||
|
|
||||||
|
for line_to_remove in to_quote_list[1:]:
|
||||||
|
text_lines[line_to_remove[0]] = None
|
||||||
|
|
||||||
|
to_quote_list.clear()
|
||||||
|
|
||||||
|
# Handle Expandable Quote
|
||||||
|
inside_blockquote = False
|
||||||
|
for index, line in enumerate(text_lines):
|
||||||
|
if line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM) and not inside_blockquote:
|
||||||
|
delim_stripped_line = line[len(BLOCKQUOTE_EXPANDABLE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_EXPANDABLE_DELIM} ") else 0) :]
|
||||||
|
parsed_line = (
|
||||||
|
html.escape(delim_stripped_line) if strict else delim_stripped_line
|
||||||
|
)
|
||||||
|
|
||||||
|
to_quote_list.append((index, parsed_line))
|
||||||
|
html_escaped_list.append(index)
|
||||||
|
|
||||||
|
inside_blockquote = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
elif line.endswith(BLOCKQUOTE_EXPANDABLE_END_DELIM) and inside_blockquote:
|
||||||
|
if line.startswith(BLOCKQUOTE_DELIM):
|
||||||
|
line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
|
||||||
|
|
||||||
|
delim_stripped_line = line[:-len(BLOCKQUOTE_EXPANDABLE_END_DELIM)]
|
||||||
|
|
||||||
|
parsed_line = (
|
||||||
|
html.escape(delim_stripped_line) if strict else delim_stripped_line
|
||||||
|
)
|
||||||
|
|
||||||
|
to_quote_list.append((index, parsed_line))
|
||||||
|
html_escaped_list.append(index)
|
||||||
|
|
||||||
|
inside_blockquote = False
|
||||||
|
|
||||||
|
create_blockquote(expandable=True)
|
||||||
|
|
||||||
|
if inside_blockquote:
|
||||||
|
parsed_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
|
||||||
|
parsed_line = html.escape(parsed_line) if strict else parsed_line
|
||||||
|
to_quote_list.append((index, parsed_line))
|
||||||
|
html_escaped_list.append(index)
|
||||||
|
|
||||||
|
# Handle Single line/Continued Quote
|
||||||
|
for index, line in enumerate(text_lines):
|
||||||
|
if line is None:
|
||||||
|
continue
|
||||||
|
|
||||||
for line in lines:
|
|
||||||
if line.startswith(BLOCKQUOTE_DELIM):
|
if line.startswith(BLOCKQUOTE_DELIM):
|
||||||
if not in_blockquote:
|
delim_stripped_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
|
||||||
line = re.sub(r'^> ', OPENING_TAG.format("blockquote"), line)
|
parsed_line = (
|
||||||
line = re.sub(r'^>', OPENING_TAG.format("blockquote"), line)
|
html.escape(delim_stripped_line) if strict else delim_stripped_line
|
||||||
in_blockquote = True
|
)
|
||||||
result.append(line.strip())
|
|
||||||
else:
|
|
||||||
result.append(line[1:].strip())
|
|
||||||
elif line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM):
|
|
||||||
if not in_blockquote:
|
|
||||||
line = re.sub(r'^\*\*> ', OPENING_TAG.format("blockquote expandable"), line)
|
|
||||||
line = re.sub(r'^\*\*>', OPENING_TAG.format("blockquote expandable"), line)
|
|
||||||
in_blockquote = True
|
|
||||||
result.append(line.strip())
|
|
||||||
else:
|
|
||||||
result.append(line[3:].strip())
|
|
||||||
else:
|
|
||||||
if in_blockquote:
|
|
||||||
line = CLOSING_TAG.format("blockquote") + line
|
|
||||||
in_blockquote = False
|
|
||||||
result.append(line)
|
|
||||||
|
|
||||||
if in_blockquote:
|
to_quote_list.append((index, parsed_line))
|
||||||
line = result[len(result)-1] + CLOSING_TAG.format("blockquote")
|
html_escaped_list.append(index)
|
||||||
result.pop(len(result)-1)
|
|
||||||
result.append(line)
|
|
||||||
|
|
||||||
return '\n'.join(result)
|
elif len(to_quote_list) > 0:
|
||||||
|
create_blockquote()
|
||||||
|
else:
|
||||||
|
create_blockquote()
|
||||||
|
|
||||||
|
if strict:
|
||||||
|
for idx, line in enumerate(text_lines):
|
||||||
|
if idx not in html_escaped_list:
|
||||||
|
text_lines[idx] = html.escape(line)
|
||||||
|
|
||||||
|
return "\n".join(
|
||||||
|
[valid_line for valid_line in text_lines if valid_line is not None]
|
||||||
|
)
|
||||||
|
|
||||||
async def parse(self, text: str, strict: bool = False):
|
async def parse(self, text: str, strict: bool = False):
|
||||||
if strict:
|
text = self.escape_and_create_quotes(text, strict=strict)
|
||||||
text = html.escape(text)
|
|
||||||
text = self.blockquote_parser(text)
|
|
||||||
|
|
||||||
delims = set()
|
delims = set()
|
||||||
is_fixed_width = False
|
is_fixed_width = False
|
||||||
|
|
||||||
placeholders = {}
|
|
||||||
for i, code_section in enumerate(CODE_TAG_RE.findall(text)):
|
|
||||||
placeholder = f"{{CODE_SECTION_{i}}}"
|
|
||||||
placeholders[placeholder] = code_section
|
|
||||||
text = text.replace(code_section, placeholder, 1)
|
|
||||||
|
|
||||||
for i, match in enumerate(re.finditer(MARKDOWN_RE, text)):
|
for i, match in enumerate(re.finditer(MARKDOWN_RE, text)):
|
||||||
start, _ = match.span()
|
start, _ = match.span()
|
||||||
delim = match.group(1)
|
delim, is_emoji, text_url, url = match.groups()
|
||||||
full = match.group(0)
|
full = match.group(0)
|
||||||
|
|
||||||
if delim in FIXED_WIDTH_DELIMS:
|
if delim in FIXED_WIDTH_DELIMS:
|
||||||
|
|
@ -128,6 +178,16 @@ class Markdown:
|
||||||
if is_fixed_width and delim not in FIXED_WIDTH_DELIMS:
|
if is_fixed_width and delim not in FIXED_WIDTH_DELIMS:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if not is_emoji and text_url:
|
||||||
|
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if is_emoji:
|
||||||
|
emoji = text_url
|
||||||
|
emoji_id = url.lstrip("tg://emoji?id=")
|
||||||
|
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
|
||||||
|
continue
|
||||||
|
|
||||||
if delim == BOLD_DELIM:
|
if delim == BOLD_DELIM:
|
||||||
tag = "b"
|
tag = "b"
|
||||||
elif delim == ITALIC_DELIM:
|
elif delim == ITALIC_DELIM:
|
||||||
|
|
@ -160,98 +220,79 @@ class Markdown:
|
||||||
|
|
||||||
text = utils.replace_once(text, delim, tag, start)
|
text = utils.replace_once(text, delim, tag, start)
|
||||||
|
|
||||||
for i, match in enumerate(re.finditer(URL_RE, text)):
|
|
||||||
start, _ = match.span()
|
|
||||||
is_emoji, text_url, url = match.groups()
|
|
||||||
full = match.group(0)
|
|
||||||
|
|
||||||
if not is_emoji and text_url:
|
|
||||||
text = utils.replace_once(text, full, URL_MARKUP.format(url, text_url), start)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if is_emoji:
|
|
||||||
emoji = text_url
|
|
||||||
emoji_id = url.lstrip("tg://emoji?id=")
|
|
||||||
text = utils.replace_once(text, full, EMOJI_MARKUP.format(emoji_id, emoji), start)
|
|
||||||
continue
|
|
||||||
|
|
||||||
for placeholder, code_section in placeholders.items():
|
|
||||||
text = text.replace(placeholder, code_section)
|
|
||||||
|
|
||||||
return await self.html.parse(text)
|
return await self.html.parse(text)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def unparse(text: str, entities: list):
|
def unparse(text: str, entities: list):
|
||||||
"""
|
|
||||||
Performs the reverse operation to .parse(), effectively returning
|
|
||||||
markdown-like syntax given a normal text and its MessageEntity's.
|
|
||||||
|
|
||||||
:param text: the text to be reconverted into markdown.
|
|
||||||
:param entities: list of MessageEntity's applied to the text.
|
|
||||||
:return: a markdown-like text representing the combination of both inputs.
|
|
||||||
"""
|
|
||||||
delimiters = {
|
|
||||||
MessageEntityType.BOLD: BOLD_DELIM,
|
|
||||||
MessageEntityType.ITALIC: ITALIC_DELIM,
|
|
||||||
MessageEntityType.UNDERLINE: UNDERLINE_DELIM,
|
|
||||||
MessageEntityType.STRIKETHROUGH: STRIKE_DELIM,
|
|
||||||
MessageEntityType.CODE: CODE_DELIM,
|
|
||||||
MessageEntityType.PRE: PRE_DELIM,
|
|
||||||
MessageEntityType.BLOCKQUOTE: BLOCKQUOTE_DELIM,
|
|
||||||
MessageEntityType.EXPANDABLE_BLOCKQUOTE: BLOCKQUOTE_EXPANDABLE_DELIM,
|
|
||||||
MessageEntityType.SPOILER: SPOILER_DELIM
|
|
||||||
}
|
|
||||||
|
|
||||||
text = utils.add_surrogates(text)
|
text = utils.add_surrogates(text)
|
||||||
|
|
||||||
insert_at = []
|
entities_offsets = []
|
||||||
for i, entity in enumerate(entities):
|
|
||||||
s = entity.offset
|
for entity in entities:
|
||||||
e = entity.offset + entity.length
|
entity_type = entity.type
|
||||||
delimiter = delimiters.get(entity.type, None)
|
start = entity.offset
|
||||||
if delimiter:
|
end = start + entity.length
|
||||||
if entity.type != MessageEntityType.BLOCKQUOTE and entity.type != MessageEntityType.EXPANDABLE_BLOCKQUOTE:
|
|
||||||
open_delimiter = delimiter
|
if entity_type == MessageEntityType.BOLD:
|
||||||
close_delimiter = delimiter
|
start_tag = end_tag = BOLD_DELIM
|
||||||
if entity.type == MessageEntityType.PRE:
|
elif entity_type == MessageEntityType.ITALIC:
|
||||||
if entity.language:
|
start_tag = end_tag = ITALIC_DELIM
|
||||||
open_delimiter += entity.language + '\n'
|
elif entity_type == MessageEntityType.UNDERLINE:
|
||||||
else:
|
start_tag = end_tag = UNDERLINE_DELIM
|
||||||
open_delimiter += entity + '\n'
|
elif entity_type == MessageEntityType.STRIKETHROUGH:
|
||||||
insert_at.append((s, i, open_delimiter))
|
start_tag = end_tag = STRIKE_DELIM
|
||||||
insert_at.append((e, -i, close_delimiter))
|
elif entity_type == MessageEntityType.CODE:
|
||||||
else:
|
start_tag = end_tag = CODE_DELIM
|
||||||
# Handle multiline blockquotes
|
elif entity_type == MessageEntityType.PRE:
|
||||||
text_subset = text[s:e]
|
language = getattr(entity, "language", "") or ""
|
||||||
lines = text_subset.splitlines()
|
start_tag = f"{PRE_DELIM}{language}\n"
|
||||||
for line_num, line in enumerate(lines):
|
end_tag = f"\n{PRE_DELIM}"
|
||||||
line_start = s + sum(len(l) + 1 for l in lines[:line_num])
|
elif entity_type == MessageEntityType.BLOCKQUOTE:
|
||||||
if entity.collapsed:
|
start_tag = BLOCKQUOTE_DELIM + " "
|
||||||
insert_at.append((line_start, i, BLOCKQUOTE_EXPANDABLE_DELIM))
|
end_tag = ""
|
||||||
else:
|
blockquote_text = text[start:end]
|
||||||
insert_at.append((line_start, i, BLOCKQUOTE_DELIM))
|
lines = blockquote_text.split("\n")
|
||||||
# No closing delimiter for blockquotes
|
last_length = 0
|
||||||
|
for line in lines:
|
||||||
|
if len(line) == 0 and last_length == end:
|
||||||
|
continue
|
||||||
|
start_offset = start+last_length
|
||||||
|
last_length = last_length+len(line)
|
||||||
|
end_offset = start_offset+last_length
|
||||||
|
entities_offsets.append((start_tag, start_offset,))
|
||||||
|
entities_offsets.append((end_tag, end_offset,))
|
||||||
|
last_length = last_length+1
|
||||||
|
continue
|
||||||
|
elif entity_type == MessageEntityType.SPOILER:
|
||||||
|
start_tag = end_tag = SPOILER_DELIM
|
||||||
|
elif entity_type == MessageEntityType.TEXT_LINK:
|
||||||
|
url = entity.url
|
||||||
|
start_tag = "["
|
||||||
|
end_tag = f"]({url})"
|
||||||
|
elif entity_type == MessageEntityType.TEXT_MENTION:
|
||||||
|
user = entity.user
|
||||||
|
start_tag = "["
|
||||||
|
end_tag = f"](tg://user?id={user.id})"
|
||||||
|
elif entity_type == MessageEntityType.CUSTOM_EMOJI:
|
||||||
|
emoji_id = entity.custom_emoji_id
|
||||||
|
start_tag = ""
|
||||||
else:
|
else:
|
||||||
url = None
|
continue
|
||||||
if entity.type == MessageEntityType.TEXT_LINK:
|
|
||||||
url = entity.url
|
|
||||||
elif entity.type == MessageEntityType.TEXT_MENTION:
|
|
||||||
url = 'tg://user?id={}'.format(entity.user.id)
|
|
||||||
if url:
|
|
||||||
insert_at.append((s, i, '['))
|
|
||||||
insert_at.append((e, -i, ']({})'.format(url)))
|
|
||||||
|
|
||||||
insert_at.sort(key=lambda t: (t[0], t[1]))
|
entities_offsets.append((start_tag, start,))
|
||||||
while insert_at:
|
entities_offsets.append((end_tag, end,))
|
||||||
at, _, what = insert_at.pop()
|
|
||||||
|
|
||||||
# If we are in the middle of a surrogate nudge the position by -1.
|
entities_offsets = map(
|
||||||
# Otherwise we would end up with malformed text and fail to encode.
|
lambda x: x[1],
|
||||||
# For example of bad input: "Hi \ud83d\ude1c"
|
sorted(
|
||||||
# https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF
|
enumerate(entities_offsets),
|
||||||
while utils.within_surrogate(text, at):
|
key=lambda x: (x[1][1], x[0]),
|
||||||
at += 1
|
reverse=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
text = text[:at] + what + text[at:]
|
for entity, offset in entities_offsets:
|
||||||
|
text = text[:offset] + entity + text[offset:]
|
||||||
|
|
||||||
return utils.remove_surrogates(text)
|
return utils.remove_surrogates(text)
|
||||||
|
|
|
||||||
|
|
@ -40,16 +40,3 @@ def remove_surrogates(text):
|
||||||
|
|
||||||
def replace_once(source: str, old: str, new: str, start: int):
|
def replace_once(source: str, old: str, new: str, start: int):
|
||||||
return source[:start] + source[start:].replace(old, new, 1)
|
return source[:start] + source[start:].replace(old, new, 1)
|
||||||
|
|
||||||
def within_surrogate(text, index, *, length=None):
|
|
||||||
"""
|
|
||||||
`True` if ``index`` is within a surrogate (before and after it, not at!).
|
|
||||||
"""
|
|
||||||
if length is None:
|
|
||||||
length = len(text)
|
|
||||||
|
|
||||||
return (
|
|
||||||
1 < index < len(text) and # in bounds
|
|
||||||
'\ud800' <= text[index - 1] <= '\udbff' and # previous is
|
|
||||||
'\ud800' <= text[index] <= '\udfff' # current is
|
|
||||||
)
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue