Xer0x's Underground

DNS Server in Python


Intro


In this post, I try to explain line by line how I implemented most of the DNS protocol in Python to make my own local DNS server which I can trust (yes I am THAT paranoid). This is only supposed to be my notes for this project, Do not expect anything.


Imports

import socket
import struct
import random
import threading
import time
from collections import OrderedDict
import urllib.request
import os
import hashlib

Configuration Constants

DNS_SERVER = '9.9.9.9'
BLOCKLIST_URLS = [
    'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts',
    'https://raw.githubusercontent.com/hagezi/dns-blocklists/main/hosts/pro-compressed.txt',
]
BLOCKLIST_CACHE_DIR = 'blocklist_cache'
BLOCKLIST_CACHE_TTL = 24 * 60 * 60  # 24 hours

BlocklistCache Class

This class manages blocklists and their caching.

Initialization

class BlocklistCache:
    def __init__(self):
        self.blocked_domains = set()
        self.last_update = 0
        self.lock = threading.Lock()
        self.cache_dir = BLOCKLIST_CACHE_DIR
        os.makedirs(self.cache_dir, exist_ok=True)

Helper Functions

def _get_cache_path(self, url):
    url_hash = hashlib.md5(url.encode()).hexdigest()
    return os.path.join(self.cache_dir, f"blocklist_{url_hash}.txt")
def _is_cache_valid(self, cache_path):
    if not os.path.exists(cache_path):
        return False
    cache_age = time.time() - os.path.getmtime(cache_path)
    return cache_age < BLOCKLIST_CACHE_TTL

Download and Parse Blocklists

def _download_and_cache_blocklist(self, url):
    cache_path = self._get_cache_path(url)
    try:
        if self._is_cache_valid(cache_path):
            with open(cache_path, 'r') as f:
                return f.readlines()

        response = urllib.request.urlopen(url)
        content = response.read().decode('utf-8').splitlines()
        with open(cache_path, 'w') as f:
            f.write('
'.join(content))
        return content
    except Exception as e:
        print(f"Error downloading blocklist {url}: {e}")
        if os.path.exists(cache_path):
            with open(cache_path, 'r') as f:
                return f.readlines()
        return []
def update_blocklists(self):
    with self.lock:
        new_blocked_domains = set()
        for url in BLOCKLIST_URLS:
            content = self._download_and_cache_blocklist(url)
            for line in content:
                line = line.strip()
                if line and not line.startswith('#'):
                    parts = line.split()
                    if len(parts) >= 2 and parts[0] in {'0.0.0.0', '127.0.0.1'}:
                        new_blocked_domains.add(parts[1].lower())
        self.blocked_domains = new_blocked_domains
        self.last_update = time.time()
def is_blocked(self, domain):
    current_time = time.time()
    if current_time - self.last_update > BLOCKLIST_CACHE_TTL:
        threading.Thread(target=self.update_blocklists).start()
    return domain.lower() in self.blocked_domains

DNSCache Class

Manages a memory cache for DNS responses.

Initialization

class DNSCache:
    def __init__(self, max_size=200 * 1024 * 1024):
        self.cache = OrderedDict()
        self.lock = threading.Lock()
        self.current_size = 0
        self.max_size = max_size

Cache Management

def set(self, key, value, ttl):
    with self.lock:
        expiration = time.time() + ttl
        entry_size = self._calculate_entry_size(key, value)
        while self.current_size + entry_size > self.max_size:
            self._evict_oldest()
        self.cache[key] = (value, expiration)
        self.cache.move_to_end(key)
        self.current_size += entry_size
def get(self, key):
    with self.lock:
        now = time.time()
        if key in self.cache:
            value, expiration = self.cache[key]
            if expiration > now:
                self.cache.move_to_end(key)
                return value
            del self.cache[key]
            self.current_size -= self._calculate_entry_size(key, value)
    return None

DNS Query/Response Functions

These functions construct, parse, and forward DNS packets:


build_query(domain, query_type)

Purpose:

This function constructs a DNS query packet. DNS queries are sent to a server to resolve domain names into IP addresses (or other record types).

Parameters:

Process:

  1. Transaction ID: - A random 16-bit integer is generated using random.randint(0, 65535). - This helps match the response to the request.
  1. Flags: - A fixed value of 0x0100 is used, indicating: - Standard query. - Recursion desired (ask upstream server if needed).
  1. Header Construction: - Six 16-bit fields are packed using struct.pack(">HHHHHH"): - Transaction ID. - Flags. - QDCOUNT: 1 (one question in the query). - ANCOUNT, NSCOUNT, ARCOUNT: 0 (no answers, authorities, or additional records yet).
  1. Question Section: - The domain is split into labels (e.g., example.comexample and com). - Each label is prefixed with its length and encoded in bytes. - A null byte () is added to signify the end of the domain name.
  1. Query Type and Class: - Appends two fields: - Query type (e.g., 1 for A records). - Query class (1 for Internet).
  1. Return: - Combines the header and question sections into a complete DNS query packet. - Returns the packet and the transaction ID for reference.

parse_query(data)

Purpose:

Parses a received DNS query packet to extract the domain name and query type.

Parameters:

Process:

  1. Skip the Header: - The first 12 bytes are the DNS header. The domain starts after that.
  1. Domain Name Parsing: - Iterates through the domain name labels: - Reads the length byte. - Extracts the corresponding number of bytes as the label. - Stops when a zero-length label (0) is encountered, indicating the end of the domain.
  1. Extract Query Type and Class: - Reads the next 4 bytes after the domain name: - Query type (2 bytes). - Query class (2 bytes).
  1. Return: - Returns the domain name (e.g., example.com) and the query type (e.g., 1 for A records). - If parsing fails, it prints an error and returns None, None.

parse_response(data)

Purpose:

Parses a DNS response packet to extract detailed information such as answers, authorities, and additional records.

Parameters:

Process:

  1. Helper Functions: - parse_name(data, offset): - Resolves domain names from the packet using labels and pointers. - parse_soa(data, offset): - Extracts SOA (Start of Authority) record details.
  1. Header Parsing: - Extracts counts of: - Questions (QDCOUNT). - Answers (ANCOUNT). - Authority records (NSCOUNT). - Additional records (ARCOUNT).
  1. Questions Section: - Skips over the question section (domain name, query type, and class).
  1. Resource Records: - Parses answer, authority, and additional sections using the helper functions: - A Records: IPv4 addresses. - AAAA Records: IPv6 addresses. - CNAME: Canonical names. - SOA: Authoritative server details. - MX: Mail exchange servers. - TXT: Text records. - NS: Name servers.
  1. Return: - Returns a dictionary containing parsed sections (answers, authority, additional).

query_upstream(domain, query_type)

Purpose:

Forwards DNS queries to an upstream DNS server for resolution.

Parameters:

Process:

  1. Build Query: - Calls build_query to construct a DNS query packet for the specified domain and type.
  1. Create Socket: - Creates a UDP socket using socket.socket(socket.AF_INET, socket.SOCK_DGRAM). - Sets a timeout of 5 seconds to avoid hanging indefinitely.
  1. Send Query: - Sends the query packet to the upstream DNS server (default 9.9.9.9 on port 53).
  1. Receive Response: - Waits for a response packet (up to 512 bytes) from the server.
  1. Close Socket: - Closes the socket to release resources.
  1. Return: - Returns the raw response packet for further processing.

Server Logic

def handle_client(server_socket):
    try:
        data, client_addr = server_socket.recvfrom(512)
        domain, query_type = parse_query(data)
        if blocklist_cache.is_blocked(domain):
            response = get_blocked_response(client_transaction_id)
        else:
            cached_response = dns_cache.get(f"{domain}:{query_type}")
            response = cached_response or query_upstream(domain, query_type)
        server_socket.sendto(response, client_addr)
    except Exception as e:
        print(f"Error handling client: {e}")
def start_server():
    print("Initializing blocklist cache...")
    blocklist_cache.update_blocklists()
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.bind(('0.0.0.0', 853))
    print("DNS server running on port 853...")
    while True:
        handle_client(server_socket)


Read more about the upstream 9.9.9.9 here.


Download the complete program here.


gladgers-hacker-gers-guardians-of-galaxy



Twitter LinkedIn Contact me on Signal

Contact me via email


#cyber security #dns #hacking #python #research

← Back to blog