Add tests and support for blacklisting IPs
authorrs <rs@midearth.co.uk>
Wed, 23 Jan 2019 07:31:23 +0000 (07:31 +0000)
committerrs <rs@midearth.co.uk>
Fri, 8 Feb 2019 07:09:51 +0000 (07:09 +0000)
.gitignore
TODO.txt [new file with mode: 0644]
nipio/__init__.py [new file with mode: 0644]
nipio/backend.conf [moved from src/backend.conf with 86% similarity]
nipio/backend.py [new file with mode: 0755]
nipio_tests/__init__.py [new file with mode: 0644]
nipio_tests/backend_test.conf [new file with mode: 0644]
nipio_tests/backend_test.py [new file with mode: 0644]
nipio_tests/backend_test_no_blacklist.conf [new file with mode: 0644]
setup.py [new file with mode: 0644]
src/backend.py [deleted file]

index f8ea97f..76e930c 100644 (file)
@@ -3,4 +3,6 @@
 *.iws
 .idea
 .DS_Store
 *.iws
 .idea
 .DS_Store
-build
\ No newline at end of file
+build
+nip.io.egg-info
+dist
\ No newline at end of file
diff --git a/TODO.txt b/TODO.txt
new file mode 100644 (file)
index 0000000..70483be
--- /dev/null
+++ b/TODO.txt
@@ -0,0 +1,5 @@
+Docker
+Github
+CI
+PIP
+_write is overly complex??!!?
\ No newline at end of file
diff --git a/nipio/__init__.py b/nipio/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
similarity index 86%
rename from src/backend.conf
rename to nipio/backend.conf
index 41d658e..ee03dc4 100644 (file)
@@ -25,3 +25,6 @@ ns1.lcl.io=127.0.0.1
 ns2.lcl.io=127.0.0.1
 
 
 ns2.lcl.io=127.0.0.1
 
 
+# blacklist
+[blacklist]
+some_description = 10.0.0.1
diff --git a/nipio/backend.py b/nipio/backend.py
new file mode 100755 (executable)
index 0000000..37851c6
--- /dev/null
@@ -0,0 +1,191 @@
+#!/usr/bin/python
+
+import ConfigParser
+import os
+import re
+import sys
+
+
+def _is_debug():
+    return False
+
+
+def _log(msg):
+    sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg))
+
+
+def _write(*l):
+    args = len(l)
+    c = 0
+    for a in l:
+        c += 1
+        if _is_debug():
+            _log('writing: %s' % a)
+        sys.stdout.write(a)
+        if c < args:
+            if _is_debug():
+                _log('writetab')
+            sys.stdout.write('\t')
+    if _is_debug():
+        _log('writenewline')
+    sys.stdout.write('\n')
+    sys.stdout.flush()
+
+
+def _get_next():
+    if _is_debug():
+        _log('reading now')
+    line = sys.stdin.readline()
+    if _is_debug():
+        _log('read line: %s' % line)
+    return line.strip().split('\t')
+
+
+class DynamicBackend:
+    def __init__(self):
+        self.id = ''
+        self.soa = ''
+        self.domain = ''
+        self.ip_address = ''
+        self.ttl = ''
+        self.name_servers = {}
+        self.blacklisted_ips = []
+
+    def configure(self):
+        fname = self._get_config_filename()
+        if not os.path.exists(fname):
+            _log('%s does not exist' % fname)
+            sys.exit(1)
+
+        with open(fname) as fp:
+            config = ConfigParser.ConfigParser()
+            config.readfp(fp)
+
+        self.id = config.get('soa', 'id')
+        self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id)
+        self.domain = config.get('main', 'domain')
+        self.ip_address = config.get('main', 'ipaddress')
+        self.ttl = config.get('main', 'ttl')
+
+        for entry in config.items('nameservers'):
+            self.name_servers[entry[0]] = entry[1]
+
+        if config.has_section("blacklist"):
+            for entry in config.items("blacklist"):
+                self.blacklisted_ips.append(entry[1])
+
+        _log('Name servers: %s' % self.name_servers)
+        _log('ID: %s' % self.id)
+        _log('TTL %s' % self.ttl)
+        _log('SOA: %s' % self.soa)
+        _log('IP Address: %s' % self.ip_address)
+        _log('DOMAIN: %s' % self.domain)
+        _log("Blacklist: %s" % self.blacklisted_ips)
+
+    def run(self):
+        _log('starting up')
+        handshake = _get_next()
+        if handshake[1] != '1':
+            _log('Not version 1: %s' % handshake)
+            sys.exit(1)
+        _write('OK', 'We are good')
+        _log('Done handshake')
+
+        while True:
+            cmd = _get_next()
+            if _is_debug():
+                _log("cmd: %s" % cmd)
+
+            if cmd[0] == "END":
+                _log("completing")
+                break
+
+            if len(cmd) < 6:
+                _log('did not understand: %s' % cmd)
+                _write('FAIL')
+                continue
+
+            qname = cmd[1].lower()
+            qtype = cmd[3]
+
+            if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain):
+                if qname == self.domain:
+                    self.handle_self(self.domain)
+                elif qname in self.name_servers:
+                    self.handle_nameservers(qname)
+                else:
+                    self.handle_subdomains(qname)
+            elif qtype == 'SOA' and qname.endswith(self.domain):
+                self.handle_soa(qname)
+            else:
+                self.handle_unknown(qtype, qname)
+
+    def handle_self(self, name):
+        _write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address)
+        self.write_name_servers(name)
+        _write('END')
+
+    def handle_subdomains(self, qname):
+        subdomain = qname[0:qname.find(self.domain) - 1]
+
+        subparts = subdomain.split('.')
+        if len(subparts) < 4:
+            if _is_debug():
+                _log('subparts less than 4')
+            self.handle_self(qname)
+            return
+
+        ip_address_parts = subparts[-4:]
+        if _is_debug():
+            _log('ip: %s' % ip_address_parts)
+        for part in ip_address_parts:
+            if re.match('^\d{1,3}$', part) is None:
+                if _is_debug():
+                    _log('%s is not a number' % part)
+                self.handle_self(qname)
+                return
+            parti = int(part)
+            if parti < 0 or parti > 255:
+                if _is_debug():
+                    _log('%d is too big/small' % parti)
+                self.handle_self(qname)
+                return
+
+        ip_address = ".".join(ip_address_parts)
+        if ip_address in self.blacklisted_ips:
+            self.handle_blacklisted(ip_address)
+            return
+
+        _write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ip_address_parts[0], ip_address_parts[1], ip_address_parts[2], ip_address_parts[3]))
+        self.write_name_servers(qname)
+        _write('END')
+
+    def handle_nameservers(self, qname):
+        ip = self.name_servers[qname]
+        _write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip)
+        _write('END')
+
+    def write_name_servers(self, qname):
+        for nameServer in self.name_servers:
+            _write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer)
+
+    def handle_soa(self, qname):
+        _write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa)
+        _write('END')
+
+    def handle_unknown(self, qtype, qname):
+        _write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname))
+        _write('END')
+
+    def handle_blacklisted(self, ip_address):
+        _write('LOG', 'Blacklisted: %s' % ip_address)
+        _write('END')
+
+    def _get_config_filename(self):
+        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf')
+
+
+if __name__ == '__main__':
+    backend = DynamicBackend()
+    backend.configure()
+    backend.run()
diff --git a/nipio_tests/__init__.py b/nipio_tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/nipio_tests/backend_test.conf b/nipio_tests/backend_test.conf
new file mode 100644 (file)
index 0000000..9bdf72b
--- /dev/null
@@ -0,0 +1,30 @@
+[main]
+# main domain
+domain=lcl.io
+
+# default ttl
+ttl=1000
+
+# default IP address for non-wildcard entries
+ipaddress=127.0.0.40
+
+
+# SOA
+[soa]
+# serial number
+id=55
+# Hostmaster email address
+hostmaster=emailaddress@lcl.io
+# Name server
+ns=ns1.lcl.io
+
+
+# nameservers
+[nameservers]
+ns1.lcl.io=127.0.0.41
+ns2.lcl.io=127.0.0.42
+
+
+# blacklist
+[blacklist]
+some_description = 10.0.0.100
diff --git a/nipio_tests/backend_test.py b/nipio_tests/backend_test.py
new file mode 100644 (file)
index 0000000..dc1dd04
--- /dev/null
@@ -0,0 +1,376 @@
+import collections
+import os
+import sys
+import unittest
+
+from assertpy import assert_that
+from mock.mock import patch, call
+
+from nipio.backend import DynamicBackend
+
+
+class DynamicBackendTest(unittest.TestCase):
+    def setUp(self):
+        self.mock_sys_patcher = patch("nipio.backend.sys")
+        self.mock_sys = self.mock_sys_patcher.start()
+
+        self.mock_sys.stderr.write = sys.stderr.write
+
+        import nipio
+        nipio.backend._is_debug = lambda: True
+
+    def tearDown(self):
+        sys.stderr.flush()
+
+        self.mock_sys_patcher.stop()
+
+    def test_backend_ends_response_to_ANY_request_if_ip_is_blacklisted(self):
+        self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(["LOG", "Blacklisted: 127.0.0.2"])
+
+    def test_backend_ends_response_to_A_request_if_ip_is_blacklisted(self):
+        self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["LOG", "Blacklisted: 127.0.0.2"]
+        )
+
+    def test_backend_responds_to_ANY_request_with_valid_ip(self):
+        self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_A_request_with_valid_ip(self):
+        self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_invalid_ip_in_ANY_request_with_self_ip(self):
+        self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_invalid_ip_in_A_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_short_ip_in_ANY_request_with_self_ip(self):
+        self._send_commands(["Q", "127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_short_ip_in_A_request_with_self(self):
+        self._send_commands(["Q", "127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_large_ip_in_ANY_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_large_ip_in_A_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_string_in_ip_in_ANY_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_string_in_ip_in_A_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_no_ip_in_ANY_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_no_ip_in_A_request_with_self(self):
+        self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_self_domain_to_A_request(self):
+        self._send_commands(["Q", "lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_self_domain_to_ANY_request(self):
+        self._send_commands(["Q", "lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
+            ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
+            ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
+        )
+
+    def test_backend_responds_to_name_servers_A_request_with_valid_ip(self):
+        self._send_commands(["Q", "ns1.lcl.io", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "ns1.lcl.io", "IN", "A", "200", "22", "127.0.0.34"],
+        )
+
+    def test_backend_responds_to_name_servers_ANY_request_with_valid_ip(self):
+        self._send_commands(["Q", "ns2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "ns2.lcl.io", "IN", "A", "200", "22", "127.0.0.35"],
+        )
+
+    def test_backend_responds_to_SOA_request_for_self(self):
+        self._send_commands(["Q", "lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+        )
+
+    def test_backend_responds_to_SOA_request_for_valid_ip(self):
+        self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+        )
+
+    def test_backend_responds_to_SOA_request_for_invalid_ip(self):
+        self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.127.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+        )
+
+    def test_backend_responds_to_SOA_request_for_no_ip(self):
+        self._send_commands(["Q", "subdomain.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "subdomain.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+        )
+
+    def test_backend_responds_to_SOA_request_for_nameserver(self):
+        self._send_commands(["Q", "ns1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["DATA", "ns1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
+        )
+
+    def test_backend_responds_to_A_request_for_unknown_domain_with_invalid_response(self):
+        self._send_commands(["Q", "unknown.domain", "IN", "A", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["LOG", "Unknown type: A, domain: unknown.domain"]
+        )
+
+    def test_backend_responds_to_invalid_request_with_invalid_response(self):
+        self._send_commands(["Q", "lcl.io", "IN", "INVALID", "1", "127.0.0.1"])
+
+        self._run_backend()
+
+        self._assert_expected_responses(
+            ["LOG", "Unknown type: INVALID, domain: lcl.io"]
+        )
+
+    def test_backend_responds_to_invalid_command_with_fail(self):
+        self._send_commands(["INVALID", "COMMAND"])
+
+        self._run_backend()
+
+        calls = [
+            call("OK"),
+            call("\t"),
+            call("We are good"),
+            call("\n"),
+
+            call("FAIL"),
+            call("\n"),
+        ]
+
+        self.mock_sys.stdout.write.assert_has_calls(calls)
+        assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
+
+        assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(2)
+
+    def test_configure_with_full_config(self):
+        backend = self._configure_backend()
+
+        assert_that(backend.id).is_equal_to("55")
+        assert_that(backend.ip_address).is_equal_to("127.0.0.40")
+        assert_that(backend.domain).is_equal_to("lcl.io")
+        assert_that(backend.ttl).is_equal_to("1000")
+        assert_that(backend.name_servers).is_equal_to({"ns1.lcl.io": "127.0.0.41", "ns2.lcl.io" : "127.0.0.42"})
+        assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.100"])
+        assert_that(backend.soa).is_equal_to("ns1.lcl.io emailaddress@lcl.io 55")
+
+    def test_configure_with_config_missing_blacklists(self):
+        backend = self._configure_backend(filename="backend_test_no_blacklist.conf")
+
+        assert_that(backend.blacklisted_ips).is_empty()
+
+    def _run_backend(self):
+        backend = self._create_backend()
+        backend.run()
+
+    def _send_commands(self, *commands):
+        commands_to_send = ["HELO\t1\n"]
+
+        for command in commands:
+            commands_to_send.append("\t".join(command) + "\n")
+
+        commands_to_send.append("END\n")
+
+        self.mock_sys.stdin.readline.side_effect = commands_to_send
+
+    def _assert_expected_responses(self, *responses):
+        calls = [
+            call("OK"),
+            call("\t"),
+            call("We are good"),
+            call("\n"),
+        ]
+
+        for response in responses:
+            tab_separated = ["\t"] * (len(response) * 2 - 1)
+            tab_separated[0::2] = response
+            tab_separated.append("\n")
+
+            calls.extend([call(response_item) for response_item in tab_separated])
+
+        calls.extend([
+            call("END"),
+            call("\n"),
+        ])
+
+        self.mock_sys.stdout.write.assert_has_calls(calls)
+        assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
+
+        assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(len(responses) + 2)
+
+    @staticmethod
+    def _create_backend():
+        backend = DynamicBackend()
+        backend.id = "22"
+        backend.soa = "MY_SOA"
+        backend.ip_address = "127.0.0.33"
+        backend.ttl = "200"
+        backend.name_servers = collections.OrderedDict([
+            ("ns1.lcl.io", "127.0.0.34"),
+            ("ns2.lcl.io", "127.0.0.35"),
+        ])
+        backend.domain = "lcl.io"
+        backend.blacklisted_ips = ["127.0.0.2"]
+        return backend
+
+    def _configure_backend(self, filename="backend_test.conf"):
+        backend = DynamicBackend()
+        backend._get_config_filename = lambda: self._get_test_config_filename(filename)
+        backend.configure()
+        return backend
+
+    def _get_test_config_filename(self, filename):
+        return os.path.join(os.path.dirname(os.path.realpath(__file__)), filename)
+
diff --git a/nipio_tests/backend_test_no_blacklist.conf b/nipio_tests/backend_test_no_blacklist.conf
new file mode 100644 (file)
index 0000000..bf6ba45
--- /dev/null
@@ -0,0 +1,25 @@
+[main]
+# main domain
+domain=lcl.io
+
+# default ttl
+ttl=1000
+
+# default IP address for non-wildcard entries
+ipaddress=127.0.0.40
+
+
+# SOA
+[soa]
+# serial number
+id=55
+# Hostmaster email address
+hostmaster=emailaddress@lcl.io
+# Name server
+ns=ns1.lcl.io
+
+
+# nameservers
+[nameservers]
+ns1.lcl.io=127.0.0.41
+ns2.lcl.io=127.0.0.42
diff --git a/setup.py b/setup.py
new file mode 100644 (file)
index 0000000..601eca4
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,9 @@
+# from distutils.core import setup
+from setuptools import setup
+
+setup(name="nip.io",
+      version="1.0",
+      packages=["nipio"],
+      tests_require=["mock", "assertpy"],
+      test_suite="nipio_tests",
+      )
diff --git a/src/backend.py b/src/backend.py
deleted file mode 100755 (executable)
index 4e373f1..0000000
+++ /dev/null
@@ -1,168 +0,0 @@
-#!/usr/bin/python
-
-import ConfigParser
-import os
-import re
-import sys
-
-DEBUG = 0
-
-
-def log(msg):
-    sys.stderr.write('backend (%s): %s\n' % (os.getpid(), msg))
-
-
-def write(*l):
-    args = len(l)
-    c = 0
-    for a in l:
-        c += 1
-        if DEBUG:
-            log('writing: %s' % a)
-        sys.stdout.write(a)
-        if c < args:
-            if DEBUG:
-                log('writetab')
-            sys.stdout.write('\t')
-    if DEBUG:
-        log('writenewline')
-    sys.stdout.write('\n')
-    sys.stdout.flush()
-
-
-def get_next():
-    if DEBUG:
-        log('reading now')
-    l = sys.stdin.readline()
-    if DEBUG:
-        log('read line: %s' % l)
-    return l.strip().split('\t')
-
-
-class DynamicBackend:
-    def __init__(self):
-        self.id = ''
-        self.soa = ''
-        self.domain = ''
-        self.ip_address = ''
-        self.ttl = ''
-        self.name_servers = {}
-
-    def configure(self):
-        fname = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'backend.conf')
-        if not os.path.exists(fname):
-            log('%s does not exist' % fname)
-            sys.exit(1)
-
-        fp = open(fname)
-        config = ConfigParser.ConfigParser()
-        config.readfp(fp)
-        fp.close()
-
-        self.id = config.get('soa', 'id')
-        self.soa = '%s %s %s' % (config.get('soa', 'ns'), config.get('soa', 'hostmaster'), self.id)
-        self.domain = config.get('main', 'domain')
-        self.ip_address = config.get('main', 'ipaddress')
-        self.ttl = config.get('main', 'ttl')
-
-        for entry in config.items('nameservers'):
-            self.name_servers[entry[0]] = entry[1]
-
-        log('Name servers: %s' % self.name_servers)
-        log('ID: %s' % self.id)
-        log('TTL %s' % self.ttl)
-        log('SOA: %s' % self.soa)
-        log('IP Address: %s' % self.ip_address)
-        log('DOMAIN: %s' % self.domain)
-
-    def run(self):
-        log('starting up')
-        handshake = get_next()
-        if handshake[1] != '1':
-            log('Not version 1: %s' % handshake)
-            sys.exit(1)
-        write('OK', 'We are good')
-        log('Done handshake')
-
-        while True:
-            cmd = get_next()
-            if DEBUG:
-                log(cmd)
-
-            if len(cmd) < 6:
-                log('did not understand: %s' % cmd)
-                write('FAIL')
-                continue
-
-            qname = cmd[1].lower()
-            qtype = cmd[3]
-
-            if (qtype == 'A' or qtype == 'ANY') and qname.endswith(self.domain):
-                if qname == self.domain:
-                    self.handle_self(self.domain)
-                elif qname in self.name_servers:
-                    self.handle_nameservers(qname)
-                else:
-                    self.handle_subdomains(qname)
-            elif qtype == 'SOA' and qname.endswith(self.domain):
-                self.handle_soa(qname)
-            else:
-                self.handle_unknown(qtype, qname)
-
-    def handle_self(self, name):
-        write('DATA', name, 'IN', 'A', self.ttl, self.id, self.ip_address)
-        self.write_name_servers(name)
-        write('END')
-
-    def handle_subdomains(self, qname):
-        subdomain = qname[0:qname.find(self.domain) - 1]
-
-        subparts = subdomain.split('.')
-        if len(subparts) < 4:
-            if DEBUG:
-                log('subparts less than 4')
-            self.handle_self(qname)
-            return
-
-        ipaddress = subparts[-4:]
-        if DEBUG:
-            log('ip: %s' % ipaddress)
-        for part in ipaddress:
-            if re.match('^\d{1,3}$', part) is None:
-                if DEBUG:
-                    log('%s is not a number' % part)
-                self.handle_self(qname)
-                return
-            parti = int(part)
-            if parti < 0 or parti > 255:
-                if DEBUG:
-                    log('%d is too big/small' % parti)
-                self.handle_self(qname)
-                return
-
-        write('DATA', qname, 'IN', 'A', self.ttl, self.id, '%s.%s.%s.%s' % (ipaddress[0], ipaddress[1], ipaddress[2], ipaddress[3]))
-        self.write_name_servers(qname)
-        write('END')
-
-    def handle_nameservers(self, qname):
-        ip = self.name_servers[qname]
-        write('DATA', qname, 'IN', 'A', self.ttl, self.id, ip)
-        write('END')
-
-    def write_name_servers(self, qname):
-        for nameServer in self.name_servers:
-            write('DATA', qname, 'IN', 'NS', self.ttl, self.id, nameServer)
-
-    def handle_soa(self, qname):
-        write('DATA', qname, 'IN', 'SOA', self.ttl, self.id, self.soa)
-        write('END')
-
-    def handle_unknown(self, qtype, qname):
-        write('LOG', 'Unknown type: %s, domain: %s' % (qtype, qname))
-        write('END')
-
-
-if __name__ == '__main__':
-    backend = DynamicBackend()
-    backend.configure()
-    backend.run()