From: rs Date: Wed, 23 Jan 2019 07:31:23 +0000 (+0000) Subject: Add tests and support for blacklisting IPs X-Git-Url: https://xp-dev.com/git/nip.io/commitdiff_plain/ff3961375c8c1439937adc41e60c0697f8b07677 Add tests and support for blacklisting IPs --- diff --git a/.gitignore b/.gitignore index f8ea97f..76e930c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ *.iws .idea .DS_Store -build \ No newline at end of file +build +nip.io.egg-info +dist \ No newline at end of file diff --git a/TODO.txt b/TODO.txt new file mode 100644 index 0000000..70483be --- /dev/null +++ b/TODO.txt @@ -0,0 +1,5 @@ +Docker +Github +CI +PIP +_write is overly complex??!!? \ No newline at end of file diff --git a/nipio/__init__.py b/nipio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend.conf b/nipio/backend.conf similarity index 86% rename from src/backend.conf rename to nipio/backend.conf index 41d658e..ee03dc4 100644 --- a/src/backend.conf +++ b/nipio/backend.conf @@ -25,3 +25,6 @@ ns1.lcl.io=127.0.0.1 ns2.lcl.io=127.0.0.1 +# blacklist +[blacklist] +some_description = 10.0.0.1 diff --git a/nipio/backend.py b/nipio/backend.py new file mode 100755 index 0000000..37851c6 --- /dev/null +++ b/nipio/backend.py @@ -0,0 +1,191 @@ +#!/usr/bin/python + +import ConfigParser +import os +import re +import sys + + +def _is_debug(): + return False + + +def _log(msg): + sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg)) + + +def _write(*l): + args = len(l) + c = 0 + for a in l: + c += 1 + if _is_debug(): + _log('writing: %s' % a) + sys.stdout.write(a) + if c < args: + if _is_debug(): + _log('writetab') + sys.stdout.write('\t') + if _is_debug(): + _log('writenewline') + sys.stdout.write('\n') + sys.stdout.flush() + + +def _get_next(): + if _is_debug(): + _log('reading now') + line = sys.stdin.readline() + if _is_debug(): + _log('read line: %s' % line) + return line.strip().split('\t') + + +class DynamicBackend: + def __init__(self): + self.id = '' + self.soa = '' + self.domain = '' + self.ip_address = '' + self.ttl = '' + self.name_servers = {} + self.blacklisted_ips = [] + + def configure(self): + fname = self._get_config_filename() + if not os.path.exists(fname): + _log('%s does not exist' % fname) + sys.exit(1) + + with open(fname) as fp: + config = ConfigParser.ConfigParser() + config.readfp(fp) + + self.id = config.get('soa', 'id') + self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id) + self.domain = config.get('main', 'domain') + self.ip_address = config.get('main', 'ipaddress') + self.ttl = config.get('main', 'ttl') + + for entry in config.items('nameservers'): + self.name_servers[entry[0]] = entry[1] + + if config.has_section("blacklist"): + for entry in config.items("blacklist"): + self.blacklisted_ips.append(entry[1]) + + _log('Name servers: %s' % self.name_servers) + _log('ID: %s' % self.id) + _log('TTL %s' % self.ttl) + _log('SOA: %s' % self.soa) + _log('IP Address: %s' % self.ip_address) + _log('DOMAIN: %s' % self.domain) + _log("Blacklist: %s" % self.blacklisted_ips) + + def run(self): + _log('starting up') + handshake = _get_next() + if handshake[1] != '1': + _log('Not version 1: %s' % handshake) + sys.exit(1) + _write('OK', 'We are good') + _log('Done handshake') + + while True: + cmd = _get_next() + if _is_debug(): + _log("cmd: %s" % cmd) + + if cmd[0] == "END": + _log("completing") + break + + if len(cmd) < 6: + _log('did not understand: %s' % cmd) + _write('FAIL') + continue + + qname = cmd[1].lower() + qtype = cmd[3] + + if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain): + if qname == self.domain: + self.handle_self(self.domain) + elif qname in self.name_servers: + self.handle_nameservers(qname) + else: + self.handle_subdomains(qname) + elif qtype == 'SOA' and qname.endswith(self.domain): + self.handle_soa(qname) + else: + self.handle_unknown(qtype, qname) + + def handle_self(self, name): + _write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address) + self.write_name_servers(name) + _write('END') + + def handle_subdomains(self, qname): + subdomain = qname[0:qname.find(self.domain) - 1] + + subparts = subdomain.split('.') + if len(subparts) < 4: + if _is_debug(): + _log('subparts less than 4') + self.handle_self(qname) + return + + ip_address_parts = subparts[-4:] + if _is_debug(): + _log('ip: %s' % ip_address_parts) + for part in ip_address_parts: + if re.match('^\d{1,3}$', part) is None: + if _is_debug(): + _log('%s is not a number' % part) + self.handle_self(qname) + return + parti = int(part) + if parti < 0 or parti > 255: + if _is_debug(): + _log('%d is too big/small' % parti) + self.handle_self(qname) + return + + ip_address = ".".join(ip_address_parts) + if ip_address in self.blacklisted_ips: + self.handle_blacklisted(ip_address) + return + + _write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ip_address_parts[0], ip_address_parts[1], ip_address_parts[2], ip_address_parts[3])) + self.write_name_servers(qname) + _write('END') + + def handle_nameservers(self, qname): + ip = self.name_servers[qname] + _write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip) + _write('END') + + def write_name_servers(self, qname): + for nameServer in self.name_servers: + _write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer) + + def handle_soa(self, qname): + _write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa) + _write('END') + + def handle_unknown(self, qtype, qname): + _write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname)) + _write('END') + + def handle_blacklisted(self, ip_address): + _write('LOG', 'Blacklisted: %s' % ip_address) + _write('END') + + def _get_config_filename(self): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf') + + +if __name__ == '__main__': + backend = DynamicBackend() + backend.configure() + backend.run() diff --git a/nipio_tests/__init__.py b/nipio_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nipio_tests/backend_test.conf b/nipio_tests/backend_test.conf new file mode 100644 index 0000000..9bdf72b --- /dev/null +++ b/nipio_tests/backend_test.conf @@ -0,0 +1,30 @@ +[main] +# main domain +domain=lcl.io + +# default ttl +ttl=1000 + +# default IP address for non-wildcard entries +ipaddress=127.0.0.40 + + +# SOA +[soa] +# serial number +id=55 +# Hostmaster email address +hostmaster=emailaddress@lcl.io +# Name server +ns=ns1.lcl.io + + +# nameservers +[nameservers] +ns1.lcl.io=127.0.0.41 +ns2.lcl.io=127.0.0.42 + + +# blacklist +[blacklist] +some_description = 10.0.0.100 diff --git a/nipio_tests/backend_test.py b/nipio_tests/backend_test.py new file mode 100644 index 0000000..dc1dd04 --- /dev/null +++ b/nipio_tests/backend_test.py @@ -0,0 +1,376 @@ +import collections +import os +import sys +import unittest + +from assertpy import assert_that +from mock.mock import patch, call + +from nipio.backend import DynamicBackend + + +class DynamicBackendTest(unittest.TestCase): + def setUp(self): + self.mock_sys_patcher = patch("nipio.backend.sys") + self.mock_sys = self.mock_sys_patcher.start() + + self.mock_sys.stderr.write = sys.stderr.write + + import nipio + nipio.backend._is_debug = lambda: True + + def tearDown(self): + sys.stderr.flush() + + self.mock_sys_patcher.stop() + + def test_backend_ends_response_to_ANY_request_if_ip_is_blacklisted(self): + self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses(["LOG", "Blacklisted: 127.0.0.2"]) + + def test_backend_ends_response_to_A_request_if_ip_is_blacklisted(self): + self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["LOG", "Blacklisted: 127.0.0.2"] + ) + + def test_backend_responds_to_ANY_request_with_valid_ip(self): + self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"], + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_A_request_with_valid_ip(self): + self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"], + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_invalid_ip_in_ANY_request_with_self_ip(self): + self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_invalid_ip_in_A_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_short_ip_in_ANY_request_with_self_ip(self): + self._send_commands(["Q", "127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_short_ip_in_A_request_with_self(self): + self._send_commands(["Q", "127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_large_ip_in_ANY_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_large_ip_in_A_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_string_in_ip_in_ANY_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_string_in_ip_in_A_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_no_ip_in_ANY_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_no_ip_in_A_request_with_self(self): + self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_self_domain_to_A_request(self): + self._send_commands(["Q", "lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_self_domain_to_ANY_request(self): + self._send_commands(["Q", "lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"], + ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"], + ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"], + ) + + def test_backend_responds_to_name_servers_A_request_with_valid_ip(self): + self._send_commands(["Q", "ns1.lcl.io", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "ns1.lcl.io", "IN", "A", "200", "22", "127.0.0.34"], + ) + + def test_backend_responds_to_name_servers_ANY_request_with_valid_ip(self): + self._send_commands(["Q", "ns2.lcl.io", "IN", "ANY", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "ns2.lcl.io", "IN", "A", "200", "22", "127.0.0.35"], + ) + + def test_backend_responds_to_SOA_request_for_self(self): + self._send_commands(["Q", "lcl.io", "IN", "SOA", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "lcl.io", "IN", "SOA", "200", "22", "MY_SOA"] + ) + + def test_backend_responds_to_SOA_request_for_valid_ip(self): + self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"] + ) + + def test_backend_responds_to_SOA_request_for_invalid_ip(self): + self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.127.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"] + ) + + def test_backend_responds_to_SOA_request_for_no_ip(self): + self._send_commands(["Q", "subdomain.lcl.io", "IN", "SOA", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "subdomain.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"] + ) + + def test_backend_responds_to_SOA_request_for_nameserver(self): + self._send_commands(["Q", "ns1.lcl.io", "IN", "SOA", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["DATA", "ns1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"] + ) + + def test_backend_responds_to_A_request_for_unknown_domain_with_invalid_response(self): + self._send_commands(["Q", "unknown.domain", "IN", "A", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["LOG", "Unknown type: A, domain: unknown.domain"] + ) + + def test_backend_responds_to_invalid_request_with_invalid_response(self): + self._send_commands(["Q", "lcl.io", "IN", "INVALID", "1", "127.0.0.1"]) + + self._run_backend() + + self._assert_expected_responses( + ["LOG", "Unknown type: INVALID, domain: lcl.io"] + ) + + def test_backend_responds_to_invalid_command_with_fail(self): + self._send_commands(["INVALID", "COMMAND"]) + + self._run_backend() + + calls = [ + call("OK"), + call("\t"), + call("We are good"), + call("\n"), + + call("FAIL"), + call("\n"), + ] + + self.mock_sys.stdout.write.assert_has_calls(calls) + assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls)) + + assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(2) + + def test_configure_with_full_config(self): + backend = self._configure_backend() + + assert_that(backend.id).is_equal_to("55") + assert_that(backend.ip_address).is_equal_to("127.0.0.40") + assert_that(backend.domain).is_equal_to("lcl.io") + assert_that(backend.ttl).is_equal_to("1000") + assert_that(backend.name_servers).is_equal_to({"ns1.lcl.io": "127.0.0.41", "ns2.lcl.io" : "127.0.0.42"}) + assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.100"]) + assert_that(backend.soa).is_equal_to("ns1.lcl.io emailaddress@lcl.io 55") + + def test_configure_with_config_missing_blacklists(self): + backend = self._configure_backend(filename="backend_test_no_blacklist.conf") + + assert_that(backend.blacklisted_ips).is_empty() + + def _run_backend(self): + backend = self._create_backend() + backend.run() + + def _send_commands(self, *commands): + commands_to_send = ["HELO\t1\n"] + + for command in commands: + commands_to_send.append("\t".join(command) + "\n") + + commands_to_send.append("END\n") + + self.mock_sys.stdin.readline.side_effect = commands_to_send + + def _assert_expected_responses(self, *responses): + calls = [ + call("OK"), + call("\t"), + call("We are good"), + call("\n"), + ] + + for response in responses: + tab_separated = ["\t"] * (len(response) * 2 - 1) + tab_separated[0::2] = response + tab_separated.append("\n") + + calls.extend([call(response_item) for response_item in tab_separated]) + + calls.extend([ + call("END"), + call("\n"), + ]) + + self.mock_sys.stdout.write.assert_has_calls(calls) + assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls)) + + assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(len(responses) + 2) + + @staticmethod + def _create_backend(): + backend = DynamicBackend() + backend.id = "22" + backend.soa = "MY_SOA" + backend.ip_address = "127.0.0.33" + backend.ttl = "200" + backend.name_servers = collections.OrderedDict([ + ("ns1.lcl.io", "127.0.0.34"), + ("ns2.lcl.io", "127.0.0.35"), + ]) + backend.domain = "lcl.io" + backend.blacklisted_ips = ["127.0.0.2"] + return backend + + def _configure_backend(self, filename="backend_test.conf"): + backend = DynamicBackend() + backend._get_config_filename = lambda: self._get_test_config_filename(filename) + backend.configure() + return backend + + def _get_test_config_filename(self, filename): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), filename) + diff --git a/nipio_tests/backend_test_no_blacklist.conf b/nipio_tests/backend_test_no_blacklist.conf new file mode 100644 index 0000000..bf6ba45 --- /dev/null +++ b/nipio_tests/backend_test_no_blacklist.conf @@ -0,0 +1,25 @@ +[main] +# main domain +domain=lcl.io + +# default ttl +ttl=1000 + +# default IP address for non-wildcard entries +ipaddress=127.0.0.40 + + +# SOA +[soa] +# serial number +id=55 +# Hostmaster email address +hostmaster=emailaddress@lcl.io +# Name server +ns=ns1.lcl.io + + +# nameservers +[nameservers] +ns1.lcl.io=127.0.0.41 +ns2.lcl.io=127.0.0.42 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..601eca4 --- /dev/null +++ b/setup.py @@ -0,0 +1,9 @@ +# from distutils.core import setup +from setuptools import setup + +setup(name="nip.io", + version="1.0", + packages=["nipio"], + tests_require=["mock", "assertpy"], + test_suite="nipio_tests", + ) diff --git a/src/backend.py b/src/backend.py deleted file mode 100755 index 4e373f1..0000000 --- a/src/backend.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/python - -import ConfigParser -import os -import re -import sys - -DEBUG = 0 - - -def log(msg): - sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg)) - - -def write(*l): - args = len(l) - c = 0 - for a in l: - c += 1 - if DEBUG: - log('writing: %s' % a) - sys.stdout.write(a) - if c < args: - if DEBUG: - log('writetab') - sys.stdout.write('\t') - if DEBUG: - log('writenewline') - sys.stdout.write('\n') - sys.stdout.flush() - - -def get_next(): - if DEBUG: - log('reading now') - l = sys.stdin.readline() - if DEBUG: - log('read line: %s' % l) - return l.strip().split('\t') - - -class DynamicBackend: - def __init__(self): - self.id = '' - self.soa = '' - self.domain = '' - self.ip_address = '' - self.ttl = '' - self.name_servers = {} - - def configure(self): - fname = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf') - if not os.path.exists(fname): - log('%s does not exist' % fname) - sys.exit(1) - - fp = open(fname) - config = ConfigParser.ConfigParser() - config.readfp(fp) - fp.close() - - self.id = config.get('soa', 'id') - self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id) - self.domain = config.get('main', 'domain') - self.ip_address = config.get('main', 'ipaddress') - self.ttl = config.get('main', 'ttl') - - for entry in config.items('nameservers'): - self.name_servers[entry[0]] = entry[1] - - log('Name servers: %s' % self.name_servers) - log('ID: %s' % self.id) - log('TTL %s' % self.ttl) - log('SOA: %s' % self.soa) - log('IP Address: %s' % self.ip_address) - log('DOMAIN: %s' % self.domain) - - def run(self): - log('starting up') - handshake = get_next() - if handshake[1] != '1': - log('Not version 1: %s' % handshake) - sys.exit(1) - write('OK', 'We are good') - log('Done handshake') - - while True: - cmd = get_next() - if DEBUG: - log(cmd) - - if len(cmd) < 6: - log('did not understand: %s' % cmd) - write('FAIL') - continue - - qname = cmd[1].lower() - qtype = cmd[3] - - if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain): - if qname == self.domain: - self.handle_self(self.domain) - elif qname in self.name_servers: - self.handle_nameservers(qname) - else: - self.handle_subdomains(qname) - elif qtype == 'SOA' and qname.endswith(self.domain): - self.handle_soa(qname) - else: - self.handle_unknown(qtype, qname) - - def handle_self(self, name): - write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address) - self.write_name_servers(name) - write('END') - - def handle_subdomains(self, qname): - subdomain = qname[0:qname.find(self.domain) - 1] - - subparts = subdomain.split('.') - if len(subparts) < 4: - if DEBUG: - log('subparts less than 4') - self.handle_self(qname) - return - - ipaddress = subparts[-4:] - if DEBUG: - log('ip: %s' % ipaddress) - for part in ipaddress: - if re.match('^\d{1,3}$', part) is None: - if DEBUG: - log('%s is not a number' % part) - self.handle_self(qname) - return - parti = int(part) - if parti < 0 or parti > 255: - if DEBUG: - log('%d is too big/small' % parti) - self.handle_self(qname) - return - - write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ipaddress[0], ipaddress[1], ipaddress[2], ipaddress[3])) - self.write_name_servers(qname) - write('END') - - def handle_nameservers(self, qname): - ip = self.name_servers[qname] - write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip) - write('END') - - def write_name_servers(self, qname): - for nameServer in self.name_servers: - write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer) - - def handle_soa(self, qname): - write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa) - write('END') - - def handle_unknown(self, qtype, qname): - write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname)) - write('END') - - -if __name__ == '__main__': - backend = DynamicBackend() - backend.configure() - backend.run()