mirror of
https://github.com/yasirarism/MissKatyPyro.git
synced 2025-12-29 09:44:50 +00:00
65 lines
1.7 KiB
Python
65 lines
1.7 KiB
Python
from asyncio import gather
|
|
|
|
import httpx
|
|
from aiohttp import ClientSession
|
|
|
|
# Aiohttp Async Client
|
|
session = ClientSession()
|
|
|
|
# HTTPx Async Client
|
|
fetch = httpx.AsyncClient(
|
|
http2=True,
|
|
verify=False,
|
|
headers={
|
|
"Accept-Language": "id-ID",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edge/107.0.1418.42",
|
|
},
|
|
timeout=httpx.Timeout(20),
|
|
)
|
|
|
|
|
|
async def get(url: str, *args, **kwargs):
|
|
async with session.get(url, *args, **kwargs) as resp:
|
|
try:
|
|
data = await resp.json()
|
|
except Exception:
|
|
data = await resp.text()
|
|
return data
|
|
|
|
|
|
async def head(url: str, *args, **kwargs):
|
|
async with session.head(url, *args, **kwargs) as resp:
|
|
try:
|
|
data = await resp.json()
|
|
except Exception:
|
|
data = await resp.text()
|
|
return data
|
|
|
|
|
|
async def post(url: str, *args, **kwargs):
|
|
async with session.post(url, *args, **kwargs) as resp:
|
|
try:
|
|
data = await resp.json()
|
|
except Exception:
|
|
data = await resp.text()
|
|
return data
|
|
|
|
|
|
async def multiget(url: str, times: int, *args, **kwargs):
|
|
return await gather(*[get(url, *args, **kwargs) for _ in range(times)])
|
|
|
|
|
|
async def multihead(url: str, times: int, *args, **kwargs):
|
|
return await gather(*[head(url, *args, **kwargs) for _ in range(times)])
|
|
|
|
|
|
async def multipost(url: str, times: int, *args, **kwargs):
|
|
return await gather(*[post(url, *args, **kwargs) for _ in range(times)])
|
|
|
|
|
|
async def resp_get(url: str, *args, **kwargs):
|
|
return await session.get(url, *args, **kwargs)
|
|
|
|
|
|
async def resp_post(url: str, *args, **kwargs):
|
|
return await session.post(url, *args, **kwargs)
|