#!/usr/local/bin/python -u # -*- Mode: python -*- # # Copyright (C) 2008 David A. Helder # # GNU General Public Licence (GPL) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import sys import string import re import os import pwd import time from namber import * from twisted.internet import reactor, defer, error from twisted.names.common import ResolverBase from twisted.names.server import DNSServerFactory from twisted.protocols.dns import DNSDatagramProtocol from twisted.protocols import dns class NamberResolver(ResolverBase): ttl = 60*60*24 def __init__(self): ResolverBase.__init__(self) self.domain = "mysteryrobot.com" self.ns_name = "ns.mysteryrobot.com" self.ns2_name = "ns2.mysteryrobot.com" self.ns_addr = "69.55.236.117" self.mx_name = "widget.gizmolabs.org" self.www_name = "www." + self.domain self.www_addr = "69.55.236.117" self.mx = dns.Record_MX(0, self.mx_name) self.soa = dns.Record_SOA(self.ns_name, 'dhelder.mysteryrobot.com', serial=37337, refresh=10800, retry=3600, expire=604800, minimum=86400) def _lookup(self, name, cls, type, timeout): timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) print timestamp, if name == None: print "FAIL (bad name)" return defer.fail(dns.AuthoritativeDomainError(name)) if cls != dns.IN: print "FAIL", name, cls, "(weird class)" return defer.fail(dns.AuthoritativeDomainError(name)) if name.lower().endswith(self.domain): return self.domain_lookup(name, cls, type) # elif name.endswith("in-addr.arpa"): # return self.reverse_lookup(name, cls, type) return defer.fail(dns.DomainError(name)) def domain_lookup(self, name, cls, type): results = [] authority = [] additional = [] if type == dns.A or type == dns.AAAA or type == dns.A6: # name server name if name.lower() == self.ns_name or name.lower() == self.ns2_name: print "LOOK", name, self.ns_addr a = dns.Record_A(self.ns_addr) rr = dns.RRHeader(name, dns.A, dns.IN, self.ttl, a) results.append(rr) # domain name and www name elif name.lower() == self.domain or name.lower() == self.www_name: print "LOOK", name, self.www_addr a = dns.Record_A(self.www_addr) rr = dns.RRHeader(name, dns.A, dns.IN, self.ttl, a) results.append(rr) # namber name else: n = name[:len(name) - len(self.domain) - 1] addr = namber_to_address(n) if not addr: print "FAIL", name return defer.fail(dns.AuthoritativeDomainError(name)) print "LOOK", name, addr a = dns.Record_A(addr) rr = dns.RRHeader(name, dns.A, dns.IN, self.ttl, a) results.append(rr) # If we match, but type is AAAA or A6, just send a SOA if type == dns.AAAA or type == dns.A6: rr = dns.RRHeader(self.domain, dns.SOA, dns.IN, self.ttl, self.soa) authority.append(rr) results = [] return defer.succeed((results, authority, additional)) elif type == dns.MX and self.mx and \ (name.lower() == self.domain or \ name.lower() == self.www_name): print "LOOK", name, self.mx_name, "(MX)" rr = dns.RRHeader(name, dns.MX, dns.IN, self.ttl, self.mx) results.append(rr) elif type == dns.SOA and self.soa: print "LOOK", name, self.ns_name, "(SOA)" rr = dns.RRHeader(self.domain, dns.SOA, dns.IN, self.ttl, self.soa) authority.append(rr) return defer.succeed((results, authority, additional)) else: print "FAIL %s (type %d)" % (name, type) return defer.fail(dns.AuthoritativeDomainError(name)) ns = dns.Record_NS(self.ns_name) rr = dns.RRHeader(self.domain, dns.NS, dns.IN, self.ttl, ns) authority.append(rr) ns2 = dns.Record_NS(self.ns2_name) rr = dns.RRHeader(self.domain, dns.NS, dns.IN, self.ttl, ns2) authority.append(rr) a = dns.Record_A(self.ns_addr) rr = dns.RRHeader(self.ns_name, dns.A, dns.IN, self.ttl, a) additional.append(rr) a = dns.Record_A(self.ns_addr) rr = dns.RRHeader(self.ns2_name, dns.A, dns.IN, self.ttl, a) additional.append(rr) return defer.succeed((results, authority, additional)) def reverse_lookup(self, name, cls, type): results = [] authority = [] additional = [] if type == dns.PTR: addr = name[:len(name) - len("in-addr.arpa") - 1] l = string.split(addr, ".") l.reverse() addr = string.join(l, ".") nam = address_to_namber(addr) if not nam: print "FAIL %s" % name return defer.fail(dns.AuthoritativeDomainError(name)) nam += "." + self.domain + "." ptr = dns.Record_PTR(nam) rr = dns.RRHeader(name, dns.PTR, dns.IN, self.ttl, ptr) results.append(rr) else: return defer.fail(dns.AuthoritativeDomainError(name)) ns = dns.Record_NS(self.ns_name) rr = dns.RRHeader('in-addr.arpa', dns.NS, dns.IN, self.ttl, ns) authority.append(rr) a = dns.Record_A(self.ns_addr) rr = dns.RRHeader(self.ns_name, dns.A, dns.IN, self.ttl, a) additional.append(rr) return defer.succeed((results, authority, additional)) def main(): dns_port = 8000 if os.getgid() == 0: dns_port = 53 # Load nambers namber_loadfile("nambers.txt") # Create DNS server nr = NamberResolver() factory = DNSServerFactory([nr], None, None, 0) protocol = DNSDatagramProtocol(factory) # Attempt to bind try: reactor.listenUDP(dns_port, protocol) reactor.listenTCP(dns_port, factory) except error.CannotListenError, e: print "ERROR:", e sys.exit(1) # Change to user/group nobody/nobody if root/root if os.getgid() == 0: os.setgid(pwd.getpwnam("nobody")[3]) if os.getuid() == 0: os.setuid(pwd.getpwnam("nobody")[2]) reactor.run() if __name__ == '__main__': main()