Files
s5p/tests/test_http.py
user 903cb38b9f feat: async HTTP client and parallel source fetching
Replace blocking urllib with a minimal async HTTP/1.1 client (http.py)
using asyncio streams. Pool source fetches now run in parallel via
asyncio.gather. Dead proxy reporting uses async POST. Handles
Content-Length, chunked transfer-encoding, and connection-close bodies.
No new dependencies.
2026-02-15 17:55:56 +01:00

154 lines
5.0 KiB
Python

"""Tests for the async HTTP client."""
import asyncio
import json
import pytest
from s5p.http import http_get_json, http_post_json
async def _run_mock_server(handler, host="127.0.0.1", port=0):
"""Start a mock TCP server, return (server, port)."""
server = await asyncio.start_server(handler, host, port)
port = server.sockets[0].getsockname()[1]
return server, port
class TestHttpGetJson:
"""Test async HTTP GET."""
def test_basic_get(self):
payload = {"proxies": [{"proto": "socks5", "proxy": "1.2.3.4:1080"}]}
body = json.dumps(payload).encode()
async def handler(reader, writer):
await reader.readline() # request line
while (await reader.readline()) not in (b"\r\n", b"\n", b""):
pass
writer.write(
f"HTTP/1.1 200 OK\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n"
f"\r\n".encode()
+ body
)
await writer.drain()
writer.close()
async def run():
server, port = await _run_mock_server(handler)
async with server:
result = await http_get_json(f"http://127.0.0.1:{port}/test")
assert result == payload
asyncio.run(run())
def test_chunked_response(self):
payload = {"status": "ok"}
body = json.dumps(payload).encode()
async def handler(reader, writer):
await reader.readline()
while (await reader.readline()) not in (b"\r\n", b"\n", b""):
pass
writer.write(
b"HTTP/1.1 200 OK\r\n"
b"Transfer-Encoding: chunked\r\n"
b"Connection: close\r\n"
b"\r\n"
)
# Send body as a single chunk
writer.write(f"{len(body):x}\r\n".encode() + body + b"\r\n")
writer.write(b"0\r\n\r\n")
await writer.drain()
writer.close()
async def run():
server, port = await _run_mock_server(handler)
async with server:
result = await http_get_json(f"http://127.0.0.1:{port}/chunked")
assert result == payload
asyncio.run(run())
def test_error_status(self):
async def handler(reader, writer):
await reader.readline()
while (await reader.readline()) not in (b"\r\n", b"\n", b""):
pass
writer.write(b"HTTP/1.1 500 Internal Server Error\r\nConnection: close\r\n\r\n")
await writer.drain()
writer.close()
async def run():
server, port = await _run_mock_server(handler)
async with server:
with pytest.raises(OSError, match="HTTP 500"):
await http_get_json(f"http://127.0.0.1:{port}/fail")
asyncio.run(run())
def test_connection_close_body(self):
"""Server sends no Content-Length, just closes connection."""
payload = {"key": "value"}
body = json.dumps(payload).encode()
async def handler(reader, writer):
await reader.readline()
while (await reader.readline()) not in (b"\r\n", b"\n", b""):
pass
writer.write(
b"HTTP/1.1 200 OK\r\n"
b"Connection: close\r\n"
b"\r\n"
+ body
)
await writer.drain()
writer.close()
await writer.wait_closed()
async def run():
server, port = await _run_mock_server(handler)
async with server:
result = await http_get_json(f"http://127.0.0.1:{port}/nosize")
assert result == payload
asyncio.run(run())
class TestHttpPostJson:
"""Test async HTTP POST."""
def test_basic_post(self):
received = {}
async def handler(reader, writer):
request_line = await reader.readline()
received["method"] = request_line.decode().split()[0]
content_length = 0
while True:
line = await reader.readline()
if line in (b"\r\n", b"\n", b""):
break
header = line.decode().lower()
if header.startswith("content-length:"):
content_length = int(header.split(":", 1)[1].strip())
body = await reader.readexactly(content_length)
received["body"] = json.loads(body)
writer.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n")
await writer.drain()
writer.close()
async def run():
server, port = await _run_mock_server(handler)
async with server:
await http_post_json(
f"http://127.0.0.1:{port}/report",
{"dead": [{"proto": "socks5", "proxy": "1.2.3.4:1080"}]},
)
assert received["method"] == "POST"
assert received["body"]["dead"][0]["proto"] == "socks5"
asyncio.run(run())