- Bump version 0.1.0 -> 0.3.0 - Add systemd service unit (config/s5p.service) and install-service Makefile target - Add CLI argument parsing tests (tests/test_cli.py, 27 tests) - Expand protocol tests with SOCKS5/4/HTTP handshake, error, and auth coverage (tests/test_proto.py, 30 tests) - Add full API reference to docs/USAGE.md with response schemas for all GET/POST endpoints - Update INSTALL.md, CHEATSHEET.md with systemd section - Update ROADMAP.md, TASKS.md for v0.3.0
336 lines
9.8 KiB
Python
336 lines
9.8 KiB
Python
"""Tests for protocol helpers."""
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from s5p.proto import (
|
|
ProtoError,
|
|
Socks5AddrType,
|
|
Socks5Reply,
|
|
encode_address,
|
|
http_connect,
|
|
socks4_connect,
|
|
socks5_connect,
|
|
)
|
|
|
|
# -- helpers -----------------------------------------------------------------
|
|
|
|
|
|
class _MockTransport(asyncio.Transport):
|
|
"""Minimal transport that captures writes and supports drain."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.written = bytearray()
|
|
self._closing = False
|
|
|
|
def write(self, data):
|
|
self.written.extend(data)
|
|
|
|
def is_closing(self):
|
|
return self._closing
|
|
|
|
def close(self):
|
|
self._closing = True
|
|
|
|
def get_extra_info(self, name, default=None):
|
|
return default
|
|
|
|
|
|
def _make_streams(response_data: bytes):
|
|
"""Create mock reader/writer for protocol tests.
|
|
|
|
Must be called from within a running event loop.
|
|
"""
|
|
reader = asyncio.StreamReader()
|
|
reader.feed_data(response_data)
|
|
reader.feed_eof()
|
|
|
|
protocol = asyncio.StreamReaderProtocol(reader)
|
|
transport = _MockTransport()
|
|
protocol.connection_made(transport)
|
|
|
|
writer = asyncio.StreamWriter(transport, protocol, reader, asyncio.get_running_loop())
|
|
return reader, writer
|
|
|
|
|
|
def _run(coro):
|
|
"""Run a coroutine in a fresh event loop."""
|
|
asyncio.run(coro)
|
|
|
|
|
|
# -- encode_address ----------------------------------------------------------
|
|
|
|
|
|
class TestEncodeAddress:
|
|
"""Test SOCKS5 address encoding."""
|
|
|
|
def test_ipv4(self):
|
|
atyp, data = encode_address("127.0.0.1")
|
|
assert atyp == Socks5AddrType.IPV4
|
|
assert data == b"\x7f\x00\x00\x01"
|
|
|
|
def test_ipv4_zeros(self):
|
|
atyp, data = encode_address("0.0.0.0")
|
|
assert atyp == Socks5AddrType.IPV4
|
|
assert data == b"\x00\x00\x00\x00"
|
|
|
|
def test_ipv6(self):
|
|
atyp, data = encode_address("::1")
|
|
assert atyp == Socks5AddrType.IPV6
|
|
assert len(data) == 16
|
|
assert data[-1] == 1
|
|
|
|
def test_ipv6_full(self):
|
|
atyp, data = encode_address("2001:db8::1")
|
|
assert atyp == Socks5AddrType.IPV6
|
|
assert len(data) == 16
|
|
|
|
def test_domain(self):
|
|
atyp, data = encode_address("example.com")
|
|
assert atyp == Socks5AddrType.DOMAIN
|
|
assert data == bytes([11]) + b"example.com"
|
|
|
|
def test_domain_short(self):
|
|
atyp, data = encode_address("a.co")
|
|
assert atyp == Socks5AddrType.DOMAIN
|
|
assert data == bytes([4]) + b"a.co"
|
|
|
|
def test_domain_long(self):
|
|
host = "sub.domain.example.com"
|
|
atyp, data = encode_address(host)
|
|
assert atyp == Socks5AddrType.DOMAIN
|
|
assert data[0] == len(host)
|
|
assert data[1:] == host.encode("ascii")
|
|
|
|
|
|
# -- socks5_connect ----------------------------------------------------------
|
|
|
|
|
|
class TestSocks5Connect:
|
|
"""Test SOCKS5 handshake building."""
|
|
|
|
def test_no_auth_success(self):
|
|
"""Successful SOCKS5 connect without auth."""
|
|
bind_addr = b"\x01\x00\x00\x00\x00\x00\x00" # IPv4 0.0.0.0:0
|
|
response = b"\x05\x00" + b"\x05\x00\x00" + bind_addr
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await socks5_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_auth_success(self):
|
|
"""Successful SOCKS5 connect with username/password auth."""
|
|
bind_addr = b"\x01\x00\x00\x00\x00\x00\x00"
|
|
response = b"\x05\x02" + b"\x01\x00" + b"\x05\x00\x00" + bind_addr
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await socks5_connect(reader, writer, "example.com", 80, "user", "pass")
|
|
|
|
_run(run())
|
|
|
|
def test_auth_failure(self):
|
|
"""SOCKS5 auth rejected by server."""
|
|
response = b"\x05\x02" + b"\x01\x01"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="authentication failed"):
|
|
await socks5_connect(reader, writer, "example.com", 80, "user", "bad")
|
|
|
|
_run(run())
|
|
|
|
def test_no_acceptable_methods(self):
|
|
"""Server rejects all auth methods (0xFF)."""
|
|
response = b"\x05\xff"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="no acceptable"):
|
|
await socks5_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_connect_refused(self):
|
|
"""SOCKS5 connect reply with connection refused."""
|
|
bind_addr = b"\x01\x00\x00\x00\x00\x00\x00"
|
|
response = b"\x05\x00" + b"\x05\x05\x00" + bind_addr
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="connect failed"):
|
|
await socks5_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_wrong_version(self):
|
|
"""Server responds with wrong SOCKS version."""
|
|
response = b"\x04\x00"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="unexpected version"):
|
|
await socks5_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_server_requires_auth_no_creds(self):
|
|
"""Server demands auth but no credentials provided."""
|
|
response = b"\x05\x02"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="requires auth"):
|
|
await socks5_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
|
|
# -- socks4_connect ----------------------------------------------------------
|
|
|
|
|
|
class TestSocks4Connect:
|
|
"""Test SOCKS4/4a request building."""
|
|
|
|
def test_ip_success(self):
|
|
"""SOCKS4 connect with IP address."""
|
|
response = b"\x00\x5a" + b"\x00" * 6
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await socks4_connect(reader, writer, "1.2.3.4", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_domain_success(self):
|
|
"""SOCKS4a connect with domain name."""
|
|
response = b"\x00\x5a" + b"\x00" * 6
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await socks4_connect(reader, writer, "example.com", 80)
|
|
|
|
_run(run())
|
|
|
|
def test_rejected(self):
|
|
"""SOCKS4 request rejected."""
|
|
response = b"\x00\x5b" + b"\x00" * 6
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="rejected"):
|
|
await socks4_connect(reader, writer, "1.2.3.4", 80)
|
|
|
|
_run(run())
|
|
|
|
|
|
# -- http_connect ------------------------------------------------------------
|
|
|
|
|
|
class TestHttpConnect:
|
|
"""Test HTTP CONNECT request building."""
|
|
|
|
def test_success_200(self):
|
|
"""HTTP CONNECT with 200 response."""
|
|
response = b"HTTP/1.1 200 Connection Established\r\n\r\n"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await http_connect(reader, writer, "example.com", 443)
|
|
|
|
_run(run())
|
|
|
|
def test_success_with_headers(self):
|
|
"""HTTP CONNECT with extra headers in response."""
|
|
response = b"HTTP/1.1 200 OK\r\nX-Proxy: test\r\n\r\n"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await http_connect(reader, writer, "example.com", 443)
|
|
|
|
_run(run())
|
|
|
|
def test_auth_success(self):
|
|
"""HTTP CONNECT with proxy authentication."""
|
|
response = b"HTTP/1.1 200 OK\r\n\r\n"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
await http_connect(reader, writer, "example.com", 443, "user", "pass")
|
|
|
|
_run(run())
|
|
|
|
def test_forbidden(self):
|
|
"""HTTP CONNECT with 403 response."""
|
|
response = b"HTTP/1.1 403 Forbidden\r\n\r\n"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="connect failed"):
|
|
await http_connect(reader, writer, "example.com", 443)
|
|
|
|
_run(run())
|
|
|
|
def test_proxy_auth_required(self):
|
|
"""HTTP CONNECT with 407 response."""
|
|
response = b"HTTP/1.1 407 Proxy Authentication Required\r\n\r\n"
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(response)
|
|
with pytest.raises(ProtoError, match="connect failed"):
|
|
await http_connect(reader, writer, "example.com", 443)
|
|
|
|
_run(run())
|
|
|
|
def test_empty_response(self):
|
|
"""HTTP CONNECT with empty response."""
|
|
|
|
async def run():
|
|
reader, writer = _make_streams(b"")
|
|
with pytest.raises(ProtoError, match="empty response"):
|
|
await http_connect(reader, writer, "example.com", 443)
|
|
|
|
_run(run())
|
|
|
|
|
|
# -- Socks5Reply enum -------------------------------------------------------
|
|
|
|
|
|
class TestSocks5Reply:
|
|
"""Test SOCKS5 reply code values."""
|
|
|
|
def test_succeeded(self):
|
|
assert Socks5Reply.SUCCEEDED == 0x00
|
|
|
|
def test_general_failure(self):
|
|
assert Socks5Reply.GENERAL_FAILURE == 0x01
|
|
|
|
def test_connection_refused(self):
|
|
assert Socks5Reply.CONNECTION_REFUSED == 0x05
|
|
|
|
def test_command_not_supported(self):
|
|
assert Socks5Reply.COMMAND_NOT_SUPPORTED == 0x07
|
|
|
|
def test_address_type_not_supported(self):
|
|
assert Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED == 0x08
|
|
|
|
|
|
# -- ProtoError --------------------------------------------------------------
|
|
|
|
|
|
class TestProtoError:
|
|
"""Test ProtoError exception."""
|
|
|
|
def test_default_reply(self):
|
|
err = ProtoError("test error")
|
|
assert str(err) == "test error"
|
|
assert err.reply == Socks5Reply.GENERAL_FAILURE
|
|
|
|
def test_custom_reply(self):
|
|
err = ProtoError("refused", Socks5Reply.CONNECTION_REFUSED)
|
|
assert err.reply == Socks5Reply.CONNECTION_REFUSED
|