feat: stealth connect with random identity and probation window

Register with a fully random nick, user, and realname (no fixed
pattern) to avoid fingerprinting. Enter a 15s probation period
after registration -- if the server k-lines, reconnect with a
fresh identity. Only after surviving probation: switch to the
configured nick and join channels.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-19 18:05:54 +01:00
parent 41ba680dcb
commit 86832b8fe5
2 changed files with 170 additions and 26 deletions

View File

@@ -11,13 +11,15 @@ replay_on_connect = true
host = "127.0.0.1"
port = 1080
# Registration uses a random nick and generic ident/realname.
# After surviving the probation window (no k-line), the bouncer
# switches to your configured nick and joins channels.
[networks.libera]
host = "irc.libera.chat"
port = 6697
tls = true
nick = "mynick"
user = "mynick"
realname = "bouncer user"
channels = ["#test"]
autojoin = true

View File

@@ -4,6 +4,9 @@ from __future__ import annotations
import asyncio
import logging
import random
import string
from enum import Enum, auto
from typing import Callable
from bouncer.config import NetworkConfig, ProxyConfig
@@ -12,6 +15,43 @@ from bouncer.irc import IRCMessage, parse
log = logging.getLogger(__name__)
BACKOFF_STEPS = [5, 10, 30, 60, 120, 300]
PROBATION_SECONDS = 15
class State(Enum):
"""Network connection state."""
DISCONNECTED = auto()
CONNECTING = auto()
REGISTERING = auto()
PROBATION = auto()
READY = auto()
def _random_nick() -> str:
"""Generate a nick that looks like a typical human-chosen IRC nick."""
# Mix of patterns seen on real IRC networks
length = random.randint(6, 10)
# Start with a letter, rest is alphanumeric
first = random.choice(string.ascii_lowercase)
rest = "".join(random.choices(string.ascii_lowercase + string.digits, k=length - 1))
return first + rest
def _random_user() -> str:
"""Generate a generic-looking ident."""
length = random.randint(4, 8)
first = random.choice(string.ascii_lowercase)
rest = "".join(random.choices(string.ascii_lowercase, k=length - 1))
return first + rest
def _random_realname() -> str:
"""Generate a plausible realname."""
length = random.randint(4, 8)
first = random.choice(string.ascii_uppercase)
rest = "".join(random.choices(string.ascii_lowercase, k=length - 1))
return first + rest
class Network:
@@ -28,17 +68,33 @@ class Network:
self.on_message = on_message
self.nick: str = cfg.nick
self.channels: set[str] = set()
self.connected: bool = False
self.registered: bool = False
self.state: State = State.DISCONNECTED
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._reconnect_attempt: int = 0
self._running: bool = False
self._read_task: asyncio.Task[None] | None = None
self._probation_task: asyncio.Task[None] | None = None
# Transient nick used during registration/probation
self._connect_nick: str = ""
# Visible hostname reported by server
self.visible_host: str | None = None
# Channel state: topic + names per channel
self.topics: dict[str, str] = {}
self.names: dict[str, set[str]] = {}
@property
def connected(self) -> bool:
return self.state not in (State.DISCONNECTED, State.CONNECTING)
@property
def registered(self) -> bool:
return self.state in (State.PROBATION, State.READY)
@property
def ready(self) -> bool:
return self.state == State.READY
async def start(self) -> None:
"""Start the network connection loop."""
self._running = True
@@ -49,6 +105,8 @@ class Network:
self._running = False
if self._read_task and not self._read_task.done():
self._read_task.cancel()
if self._probation_task and not self._probation_task.done():
self._probation_task.cancel()
await self._disconnect()
async def send(self, msg: IRCMessage) -> None:
@@ -62,9 +120,13 @@ class Network:
await self.send(IRCMessage(command=command, params=list(params)))
async def _connect(self) -> None:
"""Establish connection via SOCKS5 proxy and register."""
"""Establish connection via SOCKS5 proxy and register with random nick."""
from bouncer.proxy import connect
self.state = State.CONNECTING
self._connect_nick = _random_nick()
self.visible_host = None
try:
log.info(
"[%s] connecting to %s:%d (tls=%s)",
@@ -76,31 +138,31 @@ class Network:
self.proxy_cfg,
tls=self.cfg.tls,
)
self.connected = True
self.state = State.REGISTERING
self._reconnect_attempt = 0
log.info("[%s] connected", self.cfg.name)
log.info("[%s] connected, registering as %s", self.cfg.name, self._connect_nick)
# IRC registration
# IRC registration with generic identity
if self.cfg.password:
await self.send_raw("PASS", self.cfg.password)
await self.send_raw("NICK", self.cfg.nick)
await self.send_raw(
"USER", self.cfg.user, "0", "*", self.cfg.realname,
)
await self.send_raw("NICK", self._connect_nick)
await self.send_raw("USER", _random_user(), "0", "*", _random_realname())
# Start reading
self._read_task = asyncio.create_task(self._read_loop())
except Exception:
log.exception("[%s] connection failed", self.cfg.name)
self.connected = False
self.state = State.DISCONNECTED
if self._running:
await self._schedule_reconnect()
async def _disconnect(self) -> None:
"""Close the connection."""
self.connected = False
self.registered = False
self.state = State.DISCONNECTED
if self._probation_task and not self._probation_task.done():
self._probation_task.cancel()
self._probation_task = None
if self._writer and not self._writer.is_closing():
try:
self._writer.close()
@@ -127,7 +189,7 @@ class Network:
assert self._reader is not None
buf = b""
try:
while self._running and self.connected:
while self._running and self.state != State.DISCONNECTED:
data = await self._reader.read(4096)
if not data:
log.warning("[%s] server closed connection", self.cfg.name)
@@ -153,20 +215,92 @@ class Network:
if self._running:
await self._schedule_reconnect()
async def _enter_probation(self) -> None:
"""Start probation period after registration. Survive = ready."""
self.state = State.PROBATION
log.info(
"[%s] probation started (%ds), watching for k-line...",
self.cfg.name, PROBATION_SECONDS,
)
self._probation_task = asyncio.create_task(self._probation_timer())
async def _probation_timer(self) -> None:
"""Wait out the probation period, then transition to ready."""
try:
await asyncio.sleep(PROBATION_SECONDS)
except asyncio.CancelledError:
return
if self.state != State.PROBATION:
return
log.info("[%s] probation passed, connection stable", self.cfg.name)
await self._go_ready()
async def _go_ready(self) -> None:
"""Transition to ready: switch to desired nick, then join channels."""
self.state = State.READY
# Switch from random nick to desired nick
if self._connect_nick != self.cfg.nick:
log.info("[%s] switching nick: %s -> %s", self.cfg.name, self.nick, self.cfg.nick)
await self.send_raw("NICK", self.cfg.nick)
# nick will be updated when we get the NICK confirmation from server
# Join configured channels
if self.cfg.autojoin and self.cfg.channels:
for ch in self.cfg.channels:
await self.send_raw("JOIN", ch)
async def _handle(self, msg: IRCMessage) -> None:
"""Handle an IRC message from the server."""
if msg.command == "PING":
await self.send_raw("PONG", *msg.params)
return
if msg.command == "ERROR":
reason = msg.params[0] if msg.params else "unknown"
log.warning("[%s] server ERROR: %s", self.cfg.name, reason)
# Connection will be closed by server; read_loop handles reconnect
return
if msg.command == "001":
# RPL_WELCOME - registration complete
self.registered = True
self.nick = msg.params[0] if msg.params else self.cfg.nick
self.nick = msg.params[0] if msg.params else self._connect_nick
log.info("[%s] registered as %s", self.cfg.name, self.nick)
if self.cfg.autojoin and self.cfg.channels:
for ch in self.cfg.channels:
await self.send_raw("JOIN", ch)
# Extract hostname from welcome text: "Welcome ... nick!user@host"
if msg.params and len(msg.params) >= 2:
welcome = msg.params[-1]
if "!" in welcome and "@" in welcome:
hostmask = welcome.rsplit(" ", 1)[-1]
if "@" in hostmask:
self.visible_host = hostmask.split("@", 1)[1]
log.info("[%s] visible host: %s", self.cfg.name, self.visible_host)
await self._enter_probation()
elif msg.command == "396":
# RPL_VISIBLEHOST - displayed host changed
if len(msg.params) >= 2:
self.visible_host = msg.params[1]
log.info("[%s] visible host (396): %s", self.cfg.name, self.visible_host)
elif msg.command == "NOTICE" and msg.params:
# Extract hostname from server notices during connect
text = msg.params[-1] if msg.params else ""
if "Found your hostname" in text:
# "*** Found your hostname: some.host.example.com"
parts = text.rsplit(": ", 1)
if len(parts) == 2:
self.visible_host = parts[1].strip()
log.info("[%s] visible host (notice): %s", self.cfg.name, self.visible_host)
elif msg.command == "NICK" and msg.prefix:
# Nick change confirmation
old_nick = msg.prefix.split("!")[0]
new_nick = msg.params[0] if msg.params else old_nick
if old_nick == self.nick:
log.info("[%s] nick changed: %s -> %s", self.cfg.name, old_nick, new_nick)
self.nick = new_nick
elif msg.command == "JOIN" and msg.prefix:
nick = msg.prefix.split("!")[0]
@@ -201,10 +335,18 @@ class Network:
pass
elif msg.command == "433":
# ERR_NICKNAMEINUSE - append underscore
self.nick = self.nick + "_"
await self.send_raw("NICK", self.nick)
log.warning("[%s] nick in use, trying %s", self.cfg.name, self.nick)
# ERR_NICKNAMEINUSE
if self.state == State.READY:
# Already ready, desired nick taken -- append underscore
self.nick = self.nick + "_"
await self.send_raw("NICK", self.nick)
log.warning("[%s] nick in use, trying %s", self.cfg.name, self.nick)
else:
# Still in registration/probation, try another random nick
self._connect_nick = _random_nick()
self.nick = self._connect_nick
await self.send_raw("NICK", self._connect_nick)
log.warning("[%s] nick in use, trying %s", self.cfg.name, self._connect_nick)
elif msg.command == "KICK" and msg.params:
channel = msg.params[0]
@@ -214,7 +356,7 @@ class Network:
log.warning("[%s] kicked from %s", self.cfg.name, channel)
# Rejoin after a brief delay
await asyncio.sleep(3)
if channel in {c for c in self.cfg.channels} and self._running:
if channel in set(self.cfg.channels) and self._running and self.ready:
await self.send_raw("JOIN", channel)
# Forward to router