--- /dev/null
+#!/usr/bin/python
+
+import ConfigParser
+import os
+import re
+import sys
+
+
+def _is_debug():
+ return False
+
+
+def _log(msg):
+ sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg))
+
+
+def _write(*l):
+ args = len(l)
+ c = 0
+ for a in l:
+ c += 1
+ if _is_debug():
+ _log('writing: %s' % a)
+ sys.stdout.write(a)
+ if c < args:
+ if _is_debug():
+ _log('writetab')
+ sys.stdout.write('\t')
+ if _is_debug():
+ _log('writenewline')
+ sys.stdout.write('\n')
+ sys.stdout.flush()
+
+
+def _get_next():
+ if _is_debug():
+ _log('reading now')
+ line = sys.stdin.readline()
+ if _is_debug():
+ _log('read line: %s' % line)
+ return line.strip().split('\t')
+
+
+class DynamicBackend:
+ def __init__(self):
+ self.id = ''
+ self.soa = ''
+ self.domain = ''
+ self.ip_address = ''
+ self.ttl = ''
+ self.name_servers = {}
+ self.blacklisted_ips = []
+
+ def configure(self):
+ fname = self._get_config_filename()
+ if not os.path.exists(fname):
+ _log('%s does not exist' % fname)
+ sys.exit(1)
+
+ with open(fname) as fp:
+ config = ConfigParser.ConfigParser()
+ config.readfp(fp)
+
+ self.id = config.get('soa', 'id')
+ self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id)
+ self.domain = config.get('main', 'domain')
+ self.ip_address = config.get('main', 'ipaddress')
+ self.ttl = config.get('main', 'ttl')
+
+ for entry in config.items('nameservers'):
+ self.name_servers[entry[0]] = entry[1]
+
+ if config.has_section("blacklist"):
+ for entry in config.items("blacklist"):
+ self.blacklisted_ips.append(entry[1])
+
+ _log('Name servers: %s' % self.name_servers)
+ _log('ID: %s' % self.id)
+ _log('TTL %s' % self.ttl)
+ _log('SOA: %s' % self.soa)
+ _log('IP Address: %s' % self.ip_address)
+ _log('DOMAIN: %s' % self.domain)
+ _log("Blacklist: %s" % self.blacklisted_ips)
+
+ def run(self):
+ _log('starting up')
+ handshake = _get_next()
+ if handshake[1] != '1':
+ _log('Not version 1: %s' % handshake)
+ sys.exit(1)
+ _write('OK', 'We are good')
+ _log('Done handshake')
+
+ while True:
+ cmd = _get_next()
+ if _is_debug():
+ _log("cmd: %s" % cmd)
+
+ if cmd[0] == "END":
+ _log("completing")
+ break
+
+ if len(cmd) < 6:
+ _log('did not understand: %s' % cmd)
+ _write('FAIL')
+ continue
+
+ qname = cmd[1].lower()
+ qtype = cmd[3]
+
+ if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain):
+ if qname == self.domain:
+ self.handle_self(self.domain)
+ elif qname in self.name_servers:
+ self.handle_nameservers(qname)
+ else:
+ self.handle_subdomains(qname)
+ elif qtype == 'SOA' and qname.endswith(self.domain):
+ self.handle_soa(qname)
+ else:
+ self.handle_unknown(qtype, qname)
+
+ def handle_self(self, name):
+ _write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address)
+ self.write_name_servers(name)
+ _write('END')
+
+ def handle_subdomains(self, qname):
+ subdomain = qname[0:qname.find(self.domain) - 1]
+
+ subparts = subdomain.split('.')
+ if len(subparts) < 4:
+ if _is_debug():
+ _log('subparts less than 4')
+ self.handle_self(qname)
+ return
+
+ ip_address_parts = subparts[-4:]
+ if _is_debug():
+ _log('ip: %s' % ip_address_parts)
+ for part in ip_address_parts:
+ if re.match('^\d{1,3}$', part) is None:
+ if _is_debug():
+ _log('%s is not a number' % part)
+ self.handle_self(qname)
+ return
+ parti = int(part)
+ if parti < 0 or parti > 255:
+ if _is_debug():
+ _log('%d is too big/small' % parti)
+ self.handle_self(qname)
+ return
+
+ ip_address = ".".join(ip_address_parts)
+ if ip_address in self.blacklisted_ips:
+ self.handle_blacklisted(ip_address)
+ return
+
+ _write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ip_address_parts[0], ip_address_parts[1], ip_address_parts[2], ip_address_parts[3]))
+ self.write_name_servers(qname)
+ _write('END')
+
+ def handle_nameservers(self, qname):
+ ip = self.name_servers[qname]
+ _write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip)
+ _write('END')
+
+ def write_name_servers(self, qname):
+ for nameServer in self.name_servers:
+ _write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer)
+
+ def handle_soa(self, qname):
+ _write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa)
+ _write('END')
+
+ def handle_unknown(self, qtype, qname):
+ _write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname))
+ _write('END')
+
+ def handle_blacklisted(self, ip_address):
+ _write('LOG', 'Blacklisted: %s' % ip_address)
+ _write('END')
+
+ def _get_config_filename(self):
+ return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf')
+
+
+if __name__ == '__main__':
+ backend = DynamicBackend()
+ backend.configure()
+ backend.run()
--- /dev/null
+import collections
+import os
+import sys
+import unittest
+
+from assertpy import assert_that
+from mock.mock import patch, call
+
+from nipio.backend import DynamicBackend
+
+
+class DynamicBackendTest(unittest.TestCase):
+ def setUp(self):
+ self.mock_sys_patcher = patch("nipio.backend.sys")
+ self.mock_sys = self.mock_sys_patcher.start()
+
+ self.mock_sys.stderr.write = sys.stderr.write
+
+ import nipio
+ nipio.backend._is_debug = lambda: True
+
+ def tearDown(self):
+ sys.stderr.flush()
+
+ self.mock_sys_patcher.stop()
+
+ def test_backend_ends_response_to_ANY_request_if_ip_is_blacklisted(self):
+ self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(["LOG", "Blacklisted: 127.0.0.2"])
+
+ def test_backend_ends_response_to_A_request_if_ip_is_blacklisted(self):
+ self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["LOG", "Blacklisted: 127.0.0.2"]
+ )
+
+ def test_backend_responds_to_ANY_request_with_valid_ip(self):
+ self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_A_request_with_valid_ip(self):
+ self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_invalid_ip_in_ANY_request_with_self_ip(self):
+ self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_invalid_ip_in_A_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_short_ip_in_ANY_request_with_self_ip(self):
+ self._send_commands(["Q", "127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_short_ip_in_A_request_with_self(self):
+ self._send_commands(["Q", "127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_large_ip_in_ANY_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_large_ip_in_A_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_string_in_ip_in_ANY_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_string_in_ip_in_A_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_no_ip_in_ANY_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_no_ip_in_A_request_with_self(self):
+ self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_self_domain_to_A_request(self):
+ self._send_commands(["Q", "lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_self_domain_to_ANY_request(self):
+ self._send_commands(["Q", "lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+ ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+ ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+ )
+
+ def test_backend_responds_to_name_servers_A_request_with_valid_ip(self):
+ self._send_commands(["Q", "ns1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "ns1.lcl.io", "IN", "A", "200", "22", "127.0.0.34"],
+ )
+
+ def test_backend_responds_to_name_servers_ANY_request_with_valid_ip(self):
+ self._send_commands(["Q", "ns2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "ns2.lcl.io", "IN", "A", "200", "22", "127.0.0.35"],
+ )
+
+ def test_backend_responds_to_SOA_request_for_self(self):
+ self._send_commands(["Q", "lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+ )
+
+ def test_backend_responds_to_SOA_request_for_valid_ip(self):
+ self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+ )
+
+ def test_backend_responds_to_SOA_request_for_invalid_ip(self):
+ self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.127.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+ )
+
+ def test_backend_responds_to_SOA_request_for_no_ip(self):
+ self._send_commands(["Q", "subdomain.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "subdomain.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+ )
+
+ def test_backend_responds_to_SOA_request_for_nameserver(self):
+ self._send_commands(["Q", "ns1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["DATA", "ns1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+ )
+
+ def test_backend_responds_to_A_request_for_unknown_domain_with_invalid_response(self):
+ self._send_commands(["Q", "unknown.domain", "IN", "A", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["LOG", "Unknown type: A, domain: unknown.domain"]
+ )
+
+ def test_backend_responds_to_invalid_request_with_invalid_response(self):
+ self._send_commands(["Q", "lcl.io", "IN", "INVALID", "1", "127.0.0.1"])
+
+ self._run_backend()
+
+ self._assert_expected_responses(
+ ["LOG", "Unknown type: INVALID, domain: lcl.io"]
+ )
+
+ def test_backend_responds_to_invalid_command_with_fail(self):
+ self._send_commands(["INVALID", "COMMAND"])
+
+ self._run_backend()
+
+ calls = [
+ call("OK"),
+ call("\t"),
+ call("We are good"),
+ call("\n"),
+
+ call("FAIL"),
+ call("\n"),
+ ]
+
+ self.mock_sys.stdout.write.assert_has_calls(calls)
+ assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
+
+ assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(2)
+
+ def test_configure_with_full_config(self):
+ backend = self._configure_backend()
+
+ assert_that(backend.id).is_equal_to("55")
+ assert_that(backend.ip_address).is_equal_to("127.0.0.40")
+ assert_that(backend.domain).is_equal_to("lcl.io")
+ assert_that(backend.ttl).is_equal_to("1000")
+ assert_that(backend.name_servers).is_equal_to({"ns1.lcl.io": "127.0.0.41", "ns2.lcl.io" : "127.0.0.42"})
+ assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.100"])
+ assert_that(backend.soa).is_equal_to("ns1.lcl.io emailaddress@lcl.io 55")
+
+ def test_configure_with_config_missing_blacklists(self):
+ backend = self._configure_backend(filename="backend_test_no_blacklist.conf")
+
+ assert_that(backend.blacklisted_ips).is_empty()
+
+ def _run_backend(self):
+ backend = self._create_backend()
+ backend.run()
+
+ def _send_commands(self, *commands):
+ commands_to_send = ["HELO\t1\n"]
+
+ for command in commands:
+ commands_to_send.append("\t".join(command) + "\n")
+
+ commands_to_send.append("END\n")
+
+ self.mock_sys.stdin.readline.side_effect = commands_to_send
+
+ def _assert_expected_responses(self, *responses):
+ calls = [
+ call("OK"),
+ call("\t"),
+ call("We are good"),
+ call("\n"),
+ ]
+
+ for response in responses:
+ tab_separated = ["\t"] * (len(response) * 2 - 1)
+ tab_separated[0::2] = response
+ tab_separated.append("\n")
+
+ calls.extend([call(response_item) for response_item in tab_separated])
+
+ calls.extend([
+ call("END"),
+ call("\n"),
+ ])
+
+ self.mock_sys.stdout.write.assert_has_calls(calls)
+ assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
+
+ assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(len(responses) + 2)
+
+ @staticmethod
+ def _create_backend():
+ backend = DynamicBackend()
+ backend.id = "22"
+ backend.soa = "MY_SOA"
+ backend.ip_address = "127.0.0.33"
+ backend.ttl = "200"
+ backend.name_servers = collections.OrderedDict([
+ ("ns1.lcl.io", "127.0.0.34"),
+ ("ns2.lcl.io", "127.0.0.35"),
+ ])
+ backend.domain = "lcl.io"
+ backend.blacklisted_ips = ["127.0.0.2"]
+ return backend
+
+ def _configure_backend(self, filename="backend_test.conf"):
+ backend = DynamicBackend()
+ backend._get_config_filename = lambda: self._get_test_config_filename(filename)
+ backend.configure()
+ return backend
+
+ def _get_test_config_filename(self, filename):
+ return os.path.join(os.path.dirname(os.path.realpath(__file__)), filename)
+
+++ /dev/null
-#!/usr/bin/python
-
-import ConfigParser
-import os
-import re
-import sys
-
-DEBUG = 0
-
-
-def log(msg):
- sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg))
-
-
-def write(*l):
- args = len(l)
- c = 0
- for a in l:
- c += 1
- if DEBUG:
- log('writing: %s' % a)
- sys.stdout.write(a)
- if c < args:
- if DEBUG:
- log('writetab')
- sys.stdout.write('\t')
- if DEBUG:
- log('writenewline')
- sys.stdout.write('\n')
- sys.stdout.flush()
-
-
-def get_next():
- if DEBUG:
- log('reading now')
- l = sys.stdin.readline()
- if DEBUG:
- log('read line: %s' % l)
- return l.strip().split('\t')
-
-
-class DynamicBackend:
- def __init__(self):
- self.id = ''
- self.soa = ''
- self.domain = ''
- self.ip_address = ''
- self.ttl = ''
- self.name_servers = {}
-
- def configure(self):
- fname = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf')
- if not os.path.exists(fname):
- log('%s does not exist' % fname)
- sys.exit(1)
-
- fp = open(fname)
- config = ConfigParser.ConfigParser()
- config.readfp(fp)
- fp.close()
-
- self.id = config.get('soa', 'id')
- self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id)
- self.domain = config.get('main', 'domain')
- self.ip_address = config.get('main', 'ipaddress')
- self.ttl = config.get('main', 'ttl')
-
- for entry in config.items('nameservers'):
- self.name_servers[entry[0]] = entry[1]
-
- log('Name servers: %s' % self.name_servers)
- log('ID: %s' % self.id)
- log('TTL %s' % self.ttl)
- log('SOA: %s' % self.soa)
- log('IP Address: %s' % self.ip_address)
- log('DOMAIN: %s' % self.domain)
-
- def run(self):
- log('starting up')
- handshake = get_next()
- if handshake[1] != '1':
- log('Not version 1: %s' % handshake)
- sys.exit(1)
- write('OK', 'We are good')
- log('Done handshake')
-
- while True:
- cmd = get_next()
- if DEBUG:
- log(cmd)
-
- if len(cmd) < 6:
- log('did not understand: %s' % cmd)
- write('FAIL')
- continue
-
- qname = cmd[1].lower()
- qtype = cmd[3]
-
- if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain):
- if qname == self.domain:
- self.handle_self(self.domain)
- elif qname in self.name_servers:
- self.handle_nameservers(qname)
- else:
- self.handle_subdomains(qname)
- elif qtype == 'SOA' and qname.endswith(self.domain):
- self.handle_soa(qname)
- else:
- self.handle_unknown(qtype, qname)
-
- def handle_self(self, name):
- write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address)
- self.write_name_servers(name)
- write('END')
-
- def handle_subdomains(self, qname):
- subdomain = qname[0:qname.find(self.domain) - 1]
-
- subparts = subdomain.split('.')
- if len(subparts) < 4:
- if DEBUG:
- log('subparts less than 4')
- self.handle_self(qname)
- return
-
- ipaddress = subparts[-4:]
- if DEBUG:
- log('ip: %s' % ipaddress)
- for part in ipaddress:
- if re.match('^\d{1,3}$', part) is None:
- if DEBUG:
- log('%s is not a number' % part)
- self.handle_self(qname)
- return
- parti = int(part)
- if parti < 0 or parti > 255:
- if DEBUG:
- log('%d is too big/small' % parti)
- self.handle_self(qname)
- return
-
- write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ipaddress[0], ipaddress[1], ipaddress[2], ipaddress[3]))
- self.write_name_servers(qname)
- write('END')
-
- def handle_nameservers(self, qname):
- ip = self.name_servers[qname]
- write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip)
- write('END')
-
- def write_name_servers(self, qname):
- for nameServer in self.name_servers:
- write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer)
-
- def handle_soa(self, qname):
- write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa)
- write('END')
-
- def handle_unknown(self, qtype, qname):
- write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname))
- write('END')
-
-
-if __name__ == '__main__':
- backend = DynamicBackend()
- backend.configure()
- backend.run()