Xer0x's Underground

Chatter: Fake TLS, Real Chaos



License

Disclaimer


This post has been made as my notes, even though I attempt to explain what I have setup/built and how, I do not owe anyone any explanation. Do NOT expect anything.


My blog is my garden.


🚨 Important! Complete Source Code is not attached due to
fear of abuse
.


The information, code, and techniques presented in this tutorial are intended solely for educational purposes and security research. By using this material, you acknowledge that you are fully responsible for how you apply the knowledge gained and agree not to use it for any illegal, malicious, or unauthorized activities.


No Liability for Harmful Activities


The author of this blog disclaims all liability for any use, misuse, or abuse of the information provided. The tutorial does not endorse, support, or encourage any form of black hat hacking/cracking, cybercrime, or malicious activity. You, as the reader, are solely responsible for ensuring that you comply with all local, national, and international laws before applying any of the techniques shown.


Compliance with International and Indian Laws


This tutorial must not be used to:



The reader agrees to comply with all applicable laws, including but not limited to:



Any attempt to use this tutorial for illegal activities is prohibited. The author will not be held liable for any damages, legal actions, or penalties resulting from the misuse of the information presented.


Ethical Use Only


This tutorial is designed to foster ethical hacking and security research only. It should be used to understand vulnerabilities, improve security, and contribute positively to the field of cybersecurity. Engaging in any unauthorized activities, including the creation, distribution, or use of malware, is strictly forbidden.


By continuing to use this tutorial/blog/resource/page, the reader acknowledges and accepts the responsibility for their actions and the consequences thereof.




Intro


I’m back with another project that’s been eating up my coding hours: Chatter, a kinda-sorta stealthy server-client chat system designed to keep your communications secure and under the radar (that is the vision at least). If you’ve ever wanted a chat platform that’s probably robust, could-be private, and a bit sneaky, this might be it. Let’s see what makes Chatter tick.


What is Chatter?


Chatter is a kinda secure, asynchronous chat system built in Python using asyncio for real-time communication. It’s designed to mimic the look of TLS 1.2 traffic while adding layers of obfuscation to make it harder to detect. It supports private messages, group chats, and admin controls.


Here’s the breakdown of its core components:



Key Features


Here’s a quick rundown of what Chatter can do, with a comparison to a “typical” chat system for context:


FeatureChatter StatusTypical Chat SystemNotes
Private Messages✅ Done✅ DoneSend secure PMs to other users.
Group Chats✅ Done✅ DonePublic or private groups with invite or request-based membership.
Invite-Only Registration✅ Done❌ Not CommonRequires a valid invite code to join, limiting access.
TLS 1.2 Traffic Mimicry✅ Done❌ Not CommonCustom protocol blends into HTTPS-like traffic.
XOR Obfuscation✅ Done❌ RarePayloads are XOR’d with a complex key for stealth.
Zlib Compression✅ Done✅ CommonReduces payload size before obfuscation.
Rate Limiting✅ Done✅ CommonPrevents abuse with per-IP limits (2 reqs/30s).
Fingerprint-Based Bans✅ Done❌ RareBans users by unique session fingerprints.
Admin Controls✅ Done✅ CommonGenerate invites, ban/kick users, manage IPs, and more.
SQLite Backend✅ Done✅ CommonStores users, groups, invites, and bans persistently.
Proper End-to-End Encryption❌ Not Done✅ CommonPlanned for future versions.
GUI Client❌ Not Done✅ CommonCurrently CLI-only; GUI is on the roadmap.
Add a Real Handshake Phase (Before Payloads)❌ Not Done✅ CommonSimulate ClientHello and ServerHello before sending data.
Use Key Exchange (even if simplified)❌ Not Done✅ CommonNegotiate a random session key per connection.
Stronger Cipher than XOR❌ Not Done✅ CommonReplace XOR with AES (e.g., AES-128-CBC) or something similar after compression.
HMAC for Record Integrity❌ Not Done✅ CommonAdd HMAC-SHA256 for payload integrity checks.
Version & Content Type Fields in Header❌ Not Done✅ CommonInclude TLS-like version and type fields in your 5-byte header.
Session State❌ Not Done✅ CommonCache session keys to simulate TLS session resumption.
Random Nonces in Handshake❌ Not Done✅ CommonExchange random 32-byte nonces during handshake like real TLS.



Protocol Deep-dive


Let's try to undertand how the protocol works.


Chatter’s custom protocol (chat_protocol.py) is designed to look like TLS 1.2 traffic while hiding the real payload. Messages are compressed with zlib, XOR’d with a 32-byte key, and wrapped in a TLS-like header/fields.


This is a high level diagram of how the server works:


[Client]                             [Server]
   |                                     |
   |----(1) Create payload (e.g. login)-->|
   |                                     |
   | (2) Compress payload (zlib)          |
   | (3) XOR encrypt with XOR_KEY         |
   | (4) Wrap in TLS1.2-like Record       |
   |      + TLS Header (5 bytes)          |
   |      + Handshake Header (4 bytes)    |
   |      + Obfuscated Payload            |
   |                                     |
   |-----(5) Send TLS-like packet-------->|
   |                                     |
   |<----(6) Server receives packet-------|
   | (7) Parse TLS Header                 |
   | (8) Parse Handshake Header           |
   | (9) XOR decrypt payload              |
   |(10) Decompress payload (zlib)         |
   |(11) Interpret JSON message           |
   |                                     |
   | [Server processes request]           |
   |                                     |
   | (12) Server builds response payload  |
   | (13) Compress + XOR obfuscate again  |
   | (14) Wrap in TLS1.2-like Record      |
   |                                     |
   |<----(15) Send TLS-like response------|
   |                                     |
   | (16) Client parses like steps 7-11   |
   |                                     |

If the above does not make it clear to you this might :


+-------------------+
| Client Message    |
| (JSON)            |
+-------------------+
         |
         v
+-------------------+
| Zlib Compression  |
+-------------------+
         |
         v
+-------------------+
| XOR Obfuscation   |
| (32-byte key)     |
+-------------------+
         |
         v
+-------------------+
| TLS 1.2-like      |
| Header            |
| (Content Type 23) |
+-------------------+
         |
         v
[ Sent over TCP ]


chat_protocol.py


Imports


import struct
import zlib
import hashlib
from datetime import datetime


XOR Key


XOR_KEY = bytes.fromhex("...")


Think of XOR here like putting a "mask" on the data — it's not strong encryption, just enough to hide the content casually.


The TLS12MimicProtocol Class


This class handles everything about creating and parsing these fake TLS packets.


static variables:


CONTENT_TYPE_HANDSHAKE = 22
CONTENT_TYPE_APPLICATION = 23
VERSION_TLS_1_2 = 0x0303


class TLS12MimicProtocol:
    # TLS 1.2-like header structure
    CONTENT_TYPE_HANDSHAKE = 22
    CONTENT_TYPE_APPLICATION = 23
    VERSION_TLS_1_2 = 0x0303
    
    @staticmethod
    def create_tls_header(content_type, length):
        # Mimic TLS 1.2 record layer
        return struct.pack('!BHH', content_type, TLS12MimicProtocol.VERSION_TLS_1_2, length)
    
    @staticmethod
    def create_handshake_header(handshake_type, length):
        # Mimic TLS handshake header
        return struct.pack('!B3s', handshake_type, length.to_bytes(3, 'big'))
    
    @staticmethod
    def obfuscate_payload(payload):
        # Compress with LZ
        compressed = zlib.compress(payload)
        
        # XOR obfuscation
        key_length = len(XOR_KEY)
        result = bytearray()
        for i, byte in enumerate(compressed):
            result.append(byte ^ XOR_KEY[i % key_length])
        return bytes(result)
    
    @staticmethod
    def deobfuscate_payload(payload):
        # XOR deobfuscation
        key_length = len(XOR_KEY)
        result = bytearray()
        for i, byte in enumerate(payload):
            result.append(byte ^ XOR_KEY[i % key_length])
        
        # Decompress
        return zlib.decompress(bytes(result))
    
    @staticmethod
    def create_packet(handshake_type, payload):
        # Create TLS-like packet
        obfuscated = TLS12MimicProtocol.obfuscate_payload(payload)
        handshake_header = TLS12MimicProtocol.create_handshake_header(handshake_type, len(obfuscated))
        tls_header = TLS12MimicProtocol.create_tls_header(
            TLS12MimicProtocol.CONTENT_TYPE_APPLICATION, 
            len(handshake_header) + len(obfuscated)
        )
        return tls_header + handshake_header + obfuscated
    
    @staticmethod
    def parse_packet(data):
        if len(data) < 5:
            return None, None
            
        content_type, version, length = struct.unpack('!BHH', data[:5])
        if content_type != TLS12MimicProtocol.CONTENT_TYPE_APPLICATION:
            return None, None
            
        handshake_type = data[5]
        payload = TLS12MimicProtocol.deobfuscate_payload(data[9:])
        return handshake_type, payload

Key Methods:




server.py


Imports


import asyncio
import sqlite3
import json
import time
import uuid
from collections import defaultdict
from chat_protocol import TLS12MimicProtocol, generate_fingerprint
from datetime import datetime


ChatServer Class



class ChatServer:
    def __init__(self):
        self.clients = {}
        self.groups = {}
        self.rate_limits = defaultdict(list)
        self.login_attempts = defaultdict(list)
        self.db = sqlite3.connect('chat.db', check_same_thread=False)
        self.init_db()
        
    def init_db(self):
        cursor = self.db.cursor()
        cursor.executescript('''
            CREATE TABLE IF NOT EXISTS users (
                username TEXT PRIMARY KEY,
                password TEXT,
                invite_code TEXT,
                is_admin INTEGER DEFAULT 0
            );
            CREATE TABLE IF NOT EXISTS groups (
                group_id TEXT PRIMARY KEY,
                name TEXT,
                is_public INTEGER,
                owner TEXT
            );
            CREATE TABLE IF NOT EXISTS group_members (
                group_id TEXT,
                username TEXT,
                FOREIGN KEY(group_id) REFERENCES groups(group_id),
                FOREIGN KEY(username) REFERENCES users(username)
            );
            CREATE TABLE IF NOT EXISTS invites (
                code TEXT PRIMARY KEY,
                used_by TEXT,
                created_by TEXT
            );
            CREATE TABLE IF NOT EXISTS bans (
                type TEXT,
                value TEXT,
                fingerprint TEXT
            );
            CREATE TABLE IF NOT EXISTS fingerprints (
                fingerprint TEXT PRIMARY KEY,
                username TEXT,
                ip TEXT,
                client_info TEXT,
                timestamp DATETIME
            );
        ''')
        # Create super admin
        cursor.execute('INSERT OR IGNORE INTO users (username, password, is_admin) VALUES (?, ?, 1)',
                      ('SomeSuperAdminUsernameIamNotTellingYou', 'NiceTrySourceCodeReaderItsNotThatEasyToHackMe))
        self.db.commit()

    async def handle_client(self, reader, writer):
        addr = writer.get_extra_info('peername')
        client_info = ''
        fingerprint = ''
        
        try:
            while True:
                data = await reader.read(1024)
                if not data:
                    break
                    
                handshake_type, payload = TLS12MimicProtocol.parse_packet(data)
                if not payload:
                    continue
                    
                msg = json.loads(payload.decode())
                cmd = msg.get('command')
                
                # Rate limiting
                now = time.time()
                self.rate_limits[addr[0]].append(now)
                self.rate_limits[addr[0]] = [t for t in self.rate_limits[addr[0]] if now - t < 30]
                if len(self.rate_limits[addr[0]]) > 2:
                    writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                        'error': 'Rate limit exceeded'
                    }).encode()))
                    continue
                
                # Login attempt limiting
                if cmd == 'login':
                    self.login_attempts[addr[0]].append(now)
                    self.login_attempts[addr[0]] = [t for t in self.login_attempts[addr[0]] if now - t < 60]
                    if len(self.login_attempts[addr[0]]) > 10:
                        self.db.cursor().execute('INSERT INTO bans (type, value, fingerprint) VALUES (?, ?, ?)',
                                               ('ip', addr[0], fingerprint))
                        self.db.commit()
                        continue
                
                # Check bans
                cursor = self.db.cursor()
                cursor.execute('SELECT * FROM bans WHERE type = ? AND value = ?', ('ip', addr[0]))
                if cursor.fetchone():
                    writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                        'error': 'IP banned'
                    }).encode()))
                    continue
                
                if cmd == 'register':
                    await self.handle_register(writer, msg, addr)
                elif cmd == 'login':
                    client_info = msg.get('client_info', '')
                    fingerprint = generate_fingerprint(addr[0], msg['username'], client_info)
                    await self.handle_login(writer, msg, addr, fingerprint, client_info)
                elif cmd in ('pm', 'group_msg', 'group_create', 'group_invite', 'group_request',
                           'group_list_public', 'admin'):
                    if addr not in self.clients:
                        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                            'error': 'Not authenticated'
                        }).encode()))
                        continue
                    if cmd == 'pm':
                        await self.handle_pm(writer, msg)
                    elif cmd == 'group_msg':
                        await self.handle_group_msg(writer, msg)
                    elif cmd == 'group_create':
                        await self.handle_group_create(writer, msg)
                    elif cmd == 'group_invite':
                        await self.handle_group_invite(writer, msg)
                    elif cmd == 'group_request':
                        await self.handle_group_request(writer, msg)
                    elif cmd == 'group_list_public':
                        await self.handle_list_public_groups(writer)
                    elif cmd == 'admin':
                        await self.handle_admin(writer, msg, addr)
                        
        except Exception as e:
            print(f"Error handling client {addr}: {e}")
        finally:
            if addr in self.clients:
                del self.clients[addr]
            writer.close()
            await writer.wait_closed()

    async def handle_register(self, writer, msg, addr):
        username = msg.get('username')
        password = msg.get('password')
        invite_code = msg.get('invite_code')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT * FROM invites WHERE code = ? AND used_by IS NULL', (invite_code,))
        invite = cursor.fetchone()
        
        if not invite:
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Invalid or used invite code'
            }).encode()))
            return
            
        cursor.execute('INSERT INTO users (username, password, invite_code) VALUES (?, ?, ?)',
                      (username, password, invite_code))
        cursor.execute('UPDATE invites SET used_by = ? WHERE code = ?', (username, invite_code))
        self.db.commit()
        
        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
            'success': 'Registration successful'
        }).encode()))

    async def handle_login(self, writer, msg, addr, fingerprint, client_info):
        username = msg.get('username')
        password = msg.get('password')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT * FROM bans WHERE type = ? AND value = ?', ('user', username))
        if cursor.fetchone():
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'User banned'
            }).encode()))
            return
            
        cursor.execute('SELECT * FROM bans WHERE fingerprint = ?', (fingerprint,))
        if cursor.fetchone():
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Fingerprint banned'
            }).encode()))
            return
            
        cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, password))
        user = cursor.fetchone()
        
        if user:
            self.clients[addr] = {'writer': writer, 'username': username, 'fingerprint': fingerprint}
            cursor.execute('INSERT INTO fingerprints (fingerprint, username, ip, client_info, timestamp) VALUES (?, ?, ?, ?, ?)',
                         (fingerprint, username, addr[0], client_info, datetime.now()))
            self.db.commit()
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'success': 'Login successful'
            }).encode()))
        else:
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Invalid credentials'
            }).encode()))

    async def handle_pm(self, writer, msg):
        to_user = msg.get('to')
        message = msg.get('message')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT username FROM users WHERE username = ?', (to_user,))
        if not cursor.fetchone():
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'User not found'
            }).encode()))
            return
            
        for addr, client in self.clients.items():
            if client['username'] == to_user:
                client['writer'].write(TLS12MimicProtocol.create_packet(1, json.dumps({
                    'type': 'pm',
                    'from': msg.get('from'),
                    'message': message
                }).encode()))
                await client['writer'].drain()

    async def handle_group_create(self, writer, msg):
        group_id = str(uuid.uuid4())
        name = msg.get('name')
        is_public = msg.get('is_public', False)
        owner = msg.get('username')
        
        cursor = self.db.cursor()
        cursor.execute('INSERT INTO groups (group_id, name, is_public, owner) VALUES (?, ?, ?, ?)',
                      (group_id, name, is_public, owner))
        cursor.execute('INSERT INTO group_members (group_id, username) VALUES (?, ?)',
                      (group_id, owner))
        self.db.commit()
        
        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
            'success': f'Group {name} created with ID {group_id}'
        }).encode()))

    async def handle_group_invite(self, writer, msg):
        group_id = msg.get('group_id')
        username = msg.get('username')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT * FROM groups WHERE group_id = ? AND owner = ?',
                      (group_id, msg.get('from')))
        if not cursor.fetchone():
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Not group owner'
            }).encode()))
            return
            
        cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
        if not cursor.fetchone():
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'User not found'
            }).encode()))
            return
            
        cursor.execute('INSERT INTO group_members (group_id, username) VALUES (?, ?)',
                      (group_id, username))
        self.db.commit()
        
        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
            'success': f'User {username} invited to group'
        }).encode()))

    async def handle_group_request(self, writer, msg):
        group_id = msg.get('group_id')
        username = msg.get('username')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT * FROM groups WHERE group_id = ?', (group_id,))
        group = cursor.fetchone()
        if not group:
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Group not found'
            }).encode()))
            return
            
        if not group[2]:  # is_public
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Cannot request private group'
            }).encode()))
            return
            
        cursor.execute('INSERT INTO group_members (group_id, username) VALUES (?, ?)',
                      (group_id, username))
        self.db.commit()
        
        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
            'success': f'Joined group {group[1]}'
        }).encode()))

    async def handle_group_msg(self, writer, msg):
        group_id = msg.get('group_id')
        message = msg.get('message')
        from_user = msg.get('from')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT username FROM group_members WHERE group_id = ?', (group_id,))
        members = [row[0] for row in cursor.fetchall()]
        
        if from_user not in members:
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Not a group member'
            }).encode()))
            return
            
        for addr, client in self.clients.items():
            if client['username'] in members:
                client['writer'].write(TLS12MimicProtocol.create_packet(1, json.dumps({
                    'type': 'group_msg',
                    'group_id': group_id,
                    'from': from_user,
                    'message': message
                }).encode()))
                await client['writer'].drain()

    async def handle_list_public_groups(self, writer):
        cursor = self.db.cursor()
        cursor.execute('SELECT group_id, name FROM groups WHERE is_public = 1')
        groups = [{'id': row[0], 'name': row[1]} for row in cursor.fetchall()]
        
        writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
            'groups': groups
        }).encode()))

    async def handle_admin(self, writer, msg, addr):
        username = msg.get('from')
        action = msg.get('action')
        
        cursor = self.db.cursor()
        cursor.execute('SELECT is_admin FROM users WHERE username = ?', (username,))
        user = cursor.fetchone()
        
        if not user or not user[0]:
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'error': 'Not admin'
            }).encode()))
            return
            
        if action == 'generate_invite':
            code = str(uuid.uuid4())
            cursor.execute('INSERT INTO invites (code, created_by) VALUES (?, ?)', (code, username))
            self.db.commit()
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'invite_code': code
            }).encode()))
            
        elif action == 'grant_invite_power':
            target = msg.get('target')
            cursor.execute('UPDATE users SET is_admin = 1 WHERE username = ?', (target,))
            self.db.commit()
            
        elif action == 'revoke_invite_power':
            target = msg.get('target')
            cursor.execute('UPDATE users SET is_admin = 0 WHERE username = ? AND username != ?', 
                         (target, 'SomeSuperAdminUsernameIamNotTellingYou'))
            self.db.commit()
            
        elif action == 'ban_user':
            target = msg.get('target')
            fingerprint = msg.get('fingerprint')
            cursor.execute('INSERT INTO bans (type, value, fingerprint) VALUES (?, ?, ?)',
                         ('user', target, fingerprint))
            self.db.commit()
            
        elif action == 'list_blocked_users':
            cursor.execute('SELECT value, fingerprint FROM bans WHERE type = ?', ('user',))
            bans = [{'username': row[0], 'fingerprint': row[1]} for row in cursor.fetchall()]
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'banned_users': bans
            }).encode()))
            
        elif action == 'unblock_user':
            target = msg.get('target')
            cursor.execute('DELETE FROM bans WHERE type = ? AND value = ?', ('user', target))
            self.db.commit()
            
        elif action == 'block_ip':
            ip = msg.get('ip')
            cursor.execute('INSERT INTO bans (type, value) VALUES (?, ?)', ('ip', ip))
            self.db.commit()
            
        elif action == 'list_blocked_ips':
            cursor.execute('SELECT value FROM bans WHERE type = ?', ('ip',))
            bans = [row[0] for row in cursor.fetchall()]
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'banned_ips': bans
            }).encode()))
            
        elif action == 'unblock_ip':
            ip = msg.get('ip')
            cursor.execute('DELETE FROM bans WHERE type = ? AND value = ?', ('ip', ip))
            self.db.commit()
            
        elif action == 'kick':
            target = msg.get('target')
            for addr, client in list(self.clients.items()):
                if client['username'] == target:
                    client['writer'].write(TLS12MimicProtocol.create_packet(1, json.dumps({
                        'error': 'Kicked by admin'
                    }).encode()))
                    await client['writer'].drain()
                    client['writer'].close()
                    del self.clients[addr]
                    
        elif action == 'list_fingerprints':
            cursor.execute('SELECT fingerprint, username, ip, client_info, timestamp FROM fingerprints')
            fingerprints = [{'fingerprint': row[0], 'username': row[1], 'ip': row[2],
                           'client_info': row[3], 'timestamp': row[4]} for row in cursor.fetchall()]
            writer.write(TLS12MimicProtocol.create_packet(1, json.dumps({
                'fingerprints': fingerprints
            }).encode()))



client.py


This client.py implements a fully asynchronous chat client in Python using asyncio.


It connects to a remote chat server over TCP with an added TLS 1.2-like packet structure using our TLS12MimicProtocol.


The client supports a command-line interface (CLI) where users can register, login, private message (PM), create/join groups, and for admins, manage users, IP bans, and generate invite codes.


The below is the script output when you run client.py:


(chatter) ➜  chatter: python3 client.py 
Enter server host (default: localhost): X.X.X.X
Enter server port (default: 8443): 443
Connected to server at X.X.X.X:443

Welcome to the Chat Client!
Type /help to see available commands

Enter command or message: /help

Available commands:
    /help - Show this help message
    /register <username> <password> <invite_code> - Register a new account
    /login <username> <password> - Login to the server
    /pm <username> <message> - Send a private message
    /create_group <name> <public|private> - Create a new group
    /invite <group_id> <username> - Invite someone to your group
    /join <group_id> - Join a public group
    /group <group_id> <message> - Send a message to a group
    /list_groups - List all public groups
    
Admin commands:
    /gen_invite - Generate an invite code
    /grant <username> - Grant invite power to a user
    /revoke <username> - Revoke invite power from a user
    /ban <username> [fingerprint] - Ban a user
    /list_banned - List all banned users
    /unban <username> - Unban a user
    /block_ip <ip> - Block an IP address
    /list_ips - List all blocked IPs
    /unblock_ip <ip> - Unblock an IP address
    /kick <username> - Kick a user from the server
    /fingerprints - List all fingerprints
    
Other commands:
    /quit - Disconnect and quit
    

Enter command or message: /login superadmin supersecurepasswd

Enter command or message: Success: Login successful

Now for the actual client.py , it looks like this:

import asyncio
import json
import uuid
import os
import socket
import platform
import getpass
import sys
from datetime import datetime
from chat_protocol import TLS12MimicProtocol

class ChatClient:
    def __init__(self, host='X.X.X.X', port=443):
        self.host = host
        self.port = port
        self.reader = None
        self.writer = None
        self.username = None
        self.logged_in = False
        self.running = True
        self.client_info = self._get_client_info()
        
    def _get_client_info(self):
        """Generate client information string for fingerprinting"""
        try:
            hostname = socket.gethostname()
            os_info = f"{platform.system()} {platform.release()}"
            username = getpass.getuser()
            return f"{hostname}|{os_info}|{username}"
        except:
            return f"Unknown|{platform.system()}|Unknown"
    
    async def connect(self):
        try:
            self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
            print(f"Connected to server at {self.host}:{self.port}")
            return True
        except Exception as e:
            print(f"Failed to connect: {e}")
            return False
    
    async def send_message(self, msg_dict):
        """Send a message to the server"""
        # Add from field if logged in
        if self.logged_in and 'from' not in msg_dict and msg_dict.get('command') != 'login':
            msg_dict['from'] = self.username
            
        # Convert message to JSON and send
        msg_bytes = json.dumps(msg_dict).encode()
        packet = TLS12MimicProtocol.create_packet(1, msg_bytes)
        self.writer.write(packet)
        await self.writer.drain()
    
    async def receive_messages(self):
        """Receive and handle messages from the server"""
        try:
            while self.running:
                data = await self.reader.read(4096)
                
                if not data:
                    print("Disconnected from server")
                    self.running = False
                    break
                
                handshake_type, payload = TLS12MimicProtocol.parse_packet(data)
                if not payload:
                    continue
                
                try:
                    msg = json.loads(payload.decode())
                    await self._handle_message(msg)
                except json.JSONDecodeError:
                    print(f"Received invalid JSON data")
                except Exception as e:
                    print(f"Error handling message: {e}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if self.running:
                print(f"Connection error: {e}")
                self.running = False
    
    async def _handle_message(self, msg):
        """Handle different message types from server"""
        if 'error' in msg:
            print(f"Error: {msg['error']}")
            if msg['error'] == 'Kicked by admin':
                self.running = False
        elif 'success' in msg:
            print(f"Success: {msg['success']}")
        elif 'invite_code' in msg:
            print(f"Generated invite code: {msg['invite_code']}")
        elif 'type' in msg and msg['type'] == 'pm':
            print(f"\n[PM from {msg['from']}]: {msg['message']}")
        elif 'type' in msg and msg['type'] == 'group_msg':
            print(f"\n[Group {msg['group_id']}] {msg['from']}: {msg['message']}")
        elif 'groups' in msg:
            print("\nAvailable public groups:")
            for group in msg['groups']:
                print(f"ID: {group['id']} - Name: {group['name']}")
        elif 'banned_users' in msg:
            print("\nBanned users:")
            for user in msg['banned_users']:
                print(f"Username: {user['username']} - Fingerprint: {user['fingerprint']}")
        elif 'banned_ips' in msg:
            print("\nBanned IPs:")
            for ip in msg['banned_ips']:
                print(f"IP: {ip}")
        elif 'fingerprints' in msg:
            print("\nFingerprints:")
            for fp in msg['fingerprints']:
                print(f"User: {fp['username']} - IP: {fp['ip']} - Fingerprint: {fp['fingerprint']}")
                print(f"  Client Info: {fp['client_info']}")
                print(f"  Timestamp: {fp['timestamp']}")
                print("")
    
    async def register(self, username, password, invite_code):
        """Register a new user"""
        await self.send_message({
            'command': 'register',
            'username': username,
            'password': password,
            'invite_code': invite_code
        })
    
    async def login(self, username, password):
        """Login to the server"""
        await self.send_message({
            'command': 'login',
            'username': username,
            'password': password,
            'client_info': self.client_info
        })
        self.username = username
        self.logged_in = True
    
    async def send_pm(self, to_user, message):
        """Send a private message to a user"""
        await self.send_message({
            'command': 'pm',
            'to': to_user,
            'message': message
        })
    
    async def create_group(self, name, is_public=False):
        """Create a new group"""
        await self.send_message({
            'command': 'group_create',
            'name': name,
            'is_public': is_public,
            'username': self.username
        })
    
    async def invite_to_group(self, group_id, username):
        """Invite a user to a group"""
        await self.send_message({
            'command': 'group_invite',
            'group_id': group_id,
            'username': username
        })
    
    async def request_join_group(self, group_id):
        """Request to join a public group"""
        await self.send_message({
            'command': 'group_request',
            'group_id': group_id,
            'username': self.username
        })
    
    async def send_group_message(self, group_id, message):
        """Send a message to a group"""
        await self.send_message({
            'command': 'group_msg',
            'group_id': group_id,
            'message': message
        })
    
    async def list_public_groups(self):
        """List all public groups"""
        await self.send_message({
            'command': 'group_list_public'
        })
    
    async def admin_generate_invite(self):
        """Generate an invite code (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'generate_invite'
        })
    
    async def admin_grant_invite_power(self, target):
        """Grant invite power to another user (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'grant_invite_power',
            'target': target
        })
    
    async def admin_revoke_invite_power(self, target):
        """Revoke invite power from a user (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'revoke_invite_power',
            'target': target
        })
    
    async def admin_ban_user(self, target, fingerprint=None):
        """Ban a user (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'ban_user',
            'target': target,
            'fingerprint': fingerprint
        })
    
    async def admin_list_blocked_users(self):
        """List all blocked users (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'list_blocked_users'
        })
    
    async def admin_unblock_user(self, target):
        """Unblock a user (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'unblock_user',
            'target': target
        })
    
    async def admin_block_ip(self, ip):
        """Block an IP address (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'block_ip',
            'ip': ip
        })
    
    async def admin_list_blocked_ips(self):
        """List all blocked IPs (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'list_blocked_ips'
        })
    
    async def admin_unblock_ip(self, ip):
        """Unblock an IP address (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'unblock_ip',
            'ip': ip
        })
    
    async def admin_kick(self, target):
        """Kick a user from the server (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'kick',
            'target': target
        })
    
    async def admin_list_fingerprints(self):
        """List all fingerprints (admin only)"""
        await self.send_message({
            'command': 'admin',
            'action': 'list_fingerprints'
        })
    
    async def close(self):
        """Close the connection to the server"""
        if self.writer:
            self.writer.close()
            await self.writer.wait_closed()
        self.running = False

async def display_help():
    """Display help information"""
    help_text = """
Available commands:
    /help - Show this help message
    /register <username> <password> <invite_code> - Register a new account
    /login <username> <password> - Login to the server
    /pm <username> <message> - Send a private message
    /create_group <name> <public|private> - Create a new group
    /invite <group_id> <username> - Invite someone to your group
    /join <group_id> - Join a public group
    /group <group_id> <message> - Send a message to a group
    /list_groups - List all public groups
    
Admin commands:
    /gen_invite - Generate an invite code
    /grant <username> - Grant invite power to a user
    /revoke <username> - Revoke invite power from a user
    /ban <username> [fingerprint] - Ban a user
    /list_banned - List all banned users
    /unban <username> - Unban a user
    /block_ip <ip> - Block an IP address
    /list_ips - List all blocked IPs
    /unblock_ip <ip> - Unblock an IP address
    /kick <username> - Kick a user from the server
    /fingerprints - List all fingerprints
    
Other commands:
    /quit - Disconnect and quit
    """
    print(help_text)

async def handle_user_input(client):
    """Handle user input from the terminal"""
    try:
        while client.running:
            command = await asyncio.get_event_loop().run_in_executor(None, input, "\nEnter command or message: ")
            
            if not command:
                continue
                
            if command.startswith('/'):
                parts = command.split()
                cmd = parts[0][1:]  # Remove the '/'
                
                if cmd == 'help':
                    await display_help()
                
                elif cmd == 'register' and len(parts) >= 4:
                    username = parts[1]
                    password = parts[2]
                    invite_code = parts[3]
                    await client.register(username, password, invite_code)
                
                elif cmd == 'login' and len(parts) >= 3:
                    username = parts[1]
                    password = parts[2]
                    await client.login(username, password)
                
                elif cmd == 'pm' and len(parts) >= 3:
                    username = parts[1]
                    message = ' '.join(parts[2:])
                    await client.send_pm(username, message)
                
                elif cmd == 'create_group' and len(parts) >= 3:
                    name = parts[1]
                    is_public = parts[2].lower() == 'public'
                    await client.create_group(name, is_public)
                
                elif cmd == 'invite' and len(parts) >= 3:
                    group_id = parts[1]
                    username = parts[2]
                    await client.invite_to_group(group_id, username)
                
                elif cmd == 'join' and len(parts) >= 2:
                    group_id = parts[1]
                    await client.request_join_group(group_id)
                
                elif cmd == 'group' and len(parts) >= 3:
                    group_id = parts[1]
                    message = ' '.join(parts[2:])
                    await client.send_group_message(group_id, message)
                
                elif cmd == 'list_groups':
                    await client.list_public_groups()
                
                # Admin commands
                elif cmd == 'gen_invite':
                    await client.admin_generate_invite()
                
                elif cmd == 'grant' and len(parts) >= 2:
                    await client.admin_grant_invite_power(parts[1])
                
                elif cmd == 'revoke' and len(parts) >= 2:
                    await client.admin_revoke_invite_power(parts[1])
                
                elif cmd == 'ban' and len(parts) >= 2:
                    fingerprint = parts[2] if len(parts) > 2 else None
                    await client.admin_ban_user(parts[1], fingerprint)
                
                elif cmd == 'list_banned':
                    await client.admin_list_blocked_users()
                
                elif cmd == 'unban' and len(parts) >= 2:
                    await client.admin_unblock_user(parts[1])
                
                elif cmd == 'block_ip' and len(parts) >= 2:
                    await client.admin_block_ip(parts[1])
                
                elif cmd == 'list_ips':
                    await client.admin_list_blocked_ips()
                
                elif cmd == 'unblock_ip' and len(parts) >= 2:
                    await client.admin_unblock_ip(parts[1])
                
                elif cmd == 'kick' and len(parts) >= 2:
                    await client.admin_kick(parts[1])
                
                elif cmd == 'fingerprints':
                    await client.admin_list_fingerprints()
                
                elif cmd == 'quit':
                    print("Disconnecting...")
                    client.running = False
                
                else:
                    print("Invalid command. Type /help for a list of commands.")
            
            else:
                # If not a command and user is logged in, treat as a message to broadcast (not supported)
                print("Use /pm <username> <message> to send private messages")
                print("Use /group <group_id> <message> to send group messages")
                
    except asyncio.CancelledError:
        pass
    except Exception as e:
        print(f"Error in user input handler: {e}")
        client.running = False

async def main():
    # Get server details
    host = input("Enter server host (default: localhost): ") or "localhost"
    port = input("Enter server port (default: 8443): ") or "8443"
    
    try:
        port = int(port)
    except ValueError:
        print("Invalid port number. Using default 8443.")
        port = 8443
    
    # Create client and connect to server
    client = ChatClient(host, port)
    if not await client.connect():
        return
    
    try:
        # Start receiver task
        receiver_task = asyncio.create_task(client.receive_messages())
        
        # Show welcome message and help
        print("\nWelcome to the Chat Client!")
        print("Type /help to see available commands")
        
        # Start input handler
        await handle_user_input(client)
        
        # Cleanup
        receiver_task.cancel()
        try:
            await receiver_task
        except asyncio.CancelledError:
            pass
        
        await client.close()
        
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if not receiver_task.done():
            receiver_task.cancel()
        print("Disconnected. Goodbye!")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nProgram terminated by user.")

Tricking most firewalls


The beauty of this system is even after being so simple and pathetically insecure it was able to bypass the protocol filtering of AWS. Thats right folks imagine my suprise when I had 5 instances of AWS being able to talk with each other when the only port open was 443 in the respective Security Groups.


The Inbound rule for all the machines looked like this:


Screenshot 2025-04-26 at 6


The Outbound rule for all the machines looked like this:


Screenshot 2025-04-26 at 6


When I selct to allow HTTPS traffic --> ONLY legit HTTPS/TLS traffic should be routed through port 443. I am aware that AWS does allow Custom TCP protocols via the "Custom TCP" option in the Type section of the configuration. But in this case that was not selected and traffic was still allowed through--> This is dangerous.


This trend continued with many firewalls --> I also tested Pfsense , Sophos and Palo Alto. I just for the hell of it also tested it against Windows Firewall. None of them could even come close to detecting this was not legitimate port 443 traffic.


I have always been very skeptical about NGFWs and Cloud solutions. This further reinforces my belief that we cannot take vendors/OEMs words of black-box systems that it will work. It may not. NGFWs are just glorified packet filters with extra marketing —-> layer 7 snake oil won’t save you if your rules are a dumpster fire and you do not have serious micro-segmentation.


I also tested the protocol against many AntiVirus providers on Windows 11 by making a simple InfoStealer Malware and infecting my virtual machines. The following is the Hall of Shame of vendors who could not detect anything while the entire filesystem was being exfiltrated 🥲.


  1. Kaspersky Premium
  2. eScan Total Security Suite
  3. K7 Ultimate Security
  4. Bitdefender Total Security
  5. McAfee+ Ultimate
  6. Malwarebytes Total
  7. Quick Heal Total Security

Conclusion


These are the kinds of things you discover when you get breach attack simulation done by real hackers and not GRC human robots who barely understand what they are talking about.


Remeber that first and foremost Security is a engineering problem. Not a GRC problem. In my humble opinion GRC is less than 20% of real security.


Never Trust, Always Verify 🔒
Always Assume You have been Breached


Wake up. Dig deeper.


gladgers-hacker-gers-guardians-of-galaxy



Twitter LinkedIn Contact me on Signal

Contact me via email


#Linux #Malware #cyber security #database #development #hacking #python #research

← Back to blog