Add tests and support for blacklisting IPs
[nip.io] / nipio_tests / backend_test.py
1 import collections
2 import os
3 import sys
4 import unittest
5
6 from assertpy import assert_that
7 from mock.mock import patch, call
8
9 from nipio.backend import DynamicBackend
10
11
12 class DynamicBackendTest(unittest.TestCase):
13     def setUp(self):
14         self.mock_sys_patcher = patch("nipio.backend.sys")
15         self.mock_sys = self.mock_sys_patcher.start()
16
17         self.mock_sys.stderr.write = sys.stderr.write
18
19         import nipio
20         nipio.backend._is_debug = lambda: True
21
22     def tearDown(self):
23         sys.stderr.flush()
24
25         self.mock_sys_patcher.stop()
26
27     def test_backend_ends_response_to_ANY_request_if_ip_is_blacklisted(self):
28         self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
29
30         self._run_backend()
31
32         self._assert_expected_responses(["LOG", "Blacklisted: 127.0.0.2"])
33
34     def test_backend_ends_response_to_A_request_if_ip_is_blacklisted(self):
35         self._send_commands(["Q", "subdomain.127.0.0.2.lcl.io", "IN", "A", "1", "127.0.0.1"])
36
37         self._run_backend()
38
39         self._assert_expected_responses(
40             ["LOG", "Blacklisted: 127.0.0.2"]
41         )
42
43     def test_backend_responds_to_ANY_request_with_valid_ip(self):
44         self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
45
46         self._run_backend()
47
48         self._assert_expected_responses(
49             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
50             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
51             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
52         )
53
54     def test_backend_responds_to_A_request_with_valid_ip(self):
55         self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
56
57         self._run_backend()
58
59         self._assert_expected_responses(
60             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.1"],
61             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
62             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
63         )
64
65     def test_backend_responds_to_invalid_ip_in_ANY_request_with_self_ip(self):
66         self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
67
68         self._run_backend()
69
70         self._assert_expected_responses(
71             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
72             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
73             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
74         )
75
76     def test_backend_responds_to_invalid_ip_in_A_request_with_self(self):
77         self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
78
79         self._run_backend()
80
81         self._assert_expected_responses(
82             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
83             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
84             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
85         )
86
87     def test_backend_responds_to_short_ip_in_ANY_request_with_self_ip(self):
88         self._send_commands(["Q", "127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
89
90         self._run_backend()
91
92         self._assert_expected_responses(
93             ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
94             ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
95             ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
96         )
97
98     def test_backend_responds_to_short_ip_in_A_request_with_self(self):
99         self._send_commands(["Q", "127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
100
101         self._run_backend()
102
103         self._assert_expected_responses(
104             ["DATA", "127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
105             ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
106             ["DATA", "127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
107         )
108
109     def test_backend_responds_to_large_ip_in_ANY_request_with_self(self):
110         self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
111
112         self._run_backend()
113
114         self._assert_expected_responses(
115             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
116             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
117             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
118         )
119
120     def test_backend_responds_to_large_ip_in_A_request_with_self(self):
121         self._send_commands(["Q", "subdomain.127.0.300.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
122
123         self._run_backend()
124
125         self._assert_expected_responses(
126             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
127             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
128             ["DATA", "subdomain.127.0.300.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
129         )
130
131     def test_backend_responds_to_string_in_ip_in_ANY_request_with_self(self):
132         self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
133
134         self._run_backend()
135
136         self._assert_expected_responses(
137             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
138             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
139             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
140         )
141
142     def test_backend_responds_to_string_in_ip_in_A_request_with_self(self):
143         self._send_commands(["Q", "subdomain.127.0.STRING.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
144
145         self._run_backend()
146
147         self._assert_expected_responses(
148             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
149             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
150             ["DATA", "subdomain.127.0.string.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
151         )
152
153     def test_backend_responds_to_no_ip_in_ANY_request_with_self(self):
154         self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
155
156         self._run_backend()
157
158         self._assert_expected_responses(
159             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
160             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
161             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
162         )
163
164     def test_backend_responds_to_no_ip_in_A_request_with_self(self):
165         self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "A", "1", "127.0.0.1"])
166
167         self._run_backend()
168
169         self._assert_expected_responses(
170             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
171             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
172             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
173         )
174
175     def test_backend_responds_to_self_domain_to_A_request(self):
176         self._send_commands(["Q", "lcl.io", "IN", "A", "1", "127.0.0.1"])
177
178         self._run_backend()
179
180         self._assert_expected_responses(
181             ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
182             ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
183             ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
184         )
185
186     def test_backend_responds_to_self_domain_to_ANY_request(self):
187         self._send_commands(["Q", "lcl.io", "IN", "ANY", "1", "127.0.0.1"])
188
189         self._run_backend()
190
191         self._assert_expected_responses(
192             ["DATA", "lcl.io", "IN", "A", "200", "22", "127.0.0.33"],
193             ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns1.lcl.io"],
194             ["DATA", "lcl.io", "IN", "NS", "200", "22", "ns2.lcl.io"],
195         )
196
197     def test_backend_responds_to_name_servers_A_request_with_valid_ip(self):
198         self._send_commands(["Q", "ns1.lcl.io", "IN", "A", "1", "127.0.0.1"])
199
200         self._run_backend()
201
202         self._assert_expected_responses(
203             ["DATA", "ns1.lcl.io", "IN", "A", "200", "22", "127.0.0.34"],
204         )
205
206     def test_backend_responds_to_name_servers_ANY_request_with_valid_ip(self):
207         self._send_commands(["Q", "ns2.lcl.io", "IN", "ANY", "1", "127.0.0.1"])
208
209         self._run_backend()
210
211         self._assert_expected_responses(
212             ["DATA", "ns2.lcl.io", "IN", "A", "200", "22", "127.0.0.35"],
213         )
214
215     def test_backend_responds_to_SOA_request_for_self(self):
216         self._send_commands(["Q", "lcl.io", "IN", "SOA", "1", "127.0.0.1"])
217
218         self._run_backend()
219
220         self._assert_expected_responses(
221             ["DATA", "lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
222         )
223
224     def test_backend_responds_to_SOA_request_for_valid_ip(self):
225         self._send_commands(["Q", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
226
227         self._run_backend()
228
229         self._assert_expected_responses(
230             ["DATA", "subdomain.127.0.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
231         )
232
233     def test_backend_responds_to_SOA_request_for_invalid_ip(self):
234         self._send_commands(["Q", "subdomain.127.0.1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
235
236         self._run_backend()
237
238         self._assert_expected_responses(
239             ["DATA", "subdomain.127.0.1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
240         )
241
242     def test_backend_responds_to_SOA_request_for_no_ip(self):
243         self._send_commands(["Q", "subdomain.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
244
245         self._run_backend()
246
247         self._assert_expected_responses(
248             ["DATA", "subdomain.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
249         )
250
251     def test_backend_responds_to_SOA_request_for_nameserver(self):
252         self._send_commands(["Q", "ns1.lcl.io", "IN", "SOA", "1", "127.0.0.1"])
253
254         self._run_backend()
255
256         self._assert_expected_responses(
257             ["DATA", "ns1.lcl.io", "IN", "SOA", "200", "22", "MY_SOA"]
258         )
259
260     def test_backend_responds_to_A_request_for_unknown_domain_with_invalid_response(self):
261         self._send_commands(["Q", "unknown.domain", "IN", "A", "1", "127.0.0.1"])
262
263         self._run_backend()
264
265         self._assert_expected_responses(
266             ["LOG", "Unknown type: A, domain: unknown.domain"]
267         )
268
269     def test_backend_responds_to_invalid_request_with_invalid_response(self):
270         self._send_commands(["Q", "lcl.io", "IN", "INVALID", "1", "127.0.0.1"])
271
272         self._run_backend()
273
274         self._assert_expected_responses(
275             ["LOG", "Unknown type: INVALID, domain: lcl.io"]
276         )
277
278     def test_backend_responds_to_invalid_command_with_fail(self):
279         self._send_commands(["INVALID", "COMMAND"])
280
281         self._run_backend()
282
283         calls = [
284             call("OK"),
285             call("\t"),
286             call("We are good"),
287             call("\n"),
288
289             call("FAIL"),
290             call("\n"),
291         ]
292
293         self.mock_sys.stdout.write.assert_has_calls(calls)
294         assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
295
296         assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(2)
297
298     def test_configure_with_full_config(self):
299         backend = self._configure_backend()
300
301         assert_that(backend.id).is_equal_to("55")
302         assert_that(backend.ip_address).is_equal_to("127.0.0.40")
303         assert_that(backend.domain).is_equal_to("lcl.io")
304         assert_that(backend.ttl).is_equal_to("1000")
305         assert_that(backend.name_servers).is_equal_to({"ns1.lcl.io": "127.0.0.41", "ns2.lcl.io" : "127.0.0.42"})
306         assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.100"])
307         assert_that(backend.soa).is_equal_to("ns1.lcl.io emailaddress@lcl.io 55")
308
309     def test_configure_with_config_missing_blacklists(self):
310         backend = self._configure_backend(filename="backend_test_no_blacklist.conf")
311
312         assert_that(backend.blacklisted_ips).is_empty()
313
314     def _run_backend(self):
315         backend = self._create_backend()
316         backend.run()
317
318     def _send_commands(self, *commands):
319         commands_to_send = ["HELO\t1\n"]
320
321         for command in commands:
322             commands_to_send.append("\t".join(command) + "\n")
323
324         commands_to_send.append("END\n")
325
326         self.mock_sys.stdin.readline.side_effect = commands_to_send
327
328     def _assert_expected_responses(self, *responses):
329         calls = [
330             call("OK"),
331             call("\t"),
332             call("We are good"),
333             call("\n"),
334         ]
335
336         for response in responses:
337             tab_separated = ["\t"] * (len(response) * 2 - 1)
338             tab_separated[0::2] = response
339             tab_separated.append("\n")
340
341             calls.extend([call(response_item) for response_item in tab_separated])
342
343         calls.extend([
344             call("END"),
345             call("\n"),
346         ])
347
348         self.mock_sys.stdout.write.assert_has_calls(calls)
349         assert_that(self.mock_sys.stdout.write.call_count).is_equal_to(len(calls))
350
351         assert_that(self.mock_sys.stdout.flush.call_count).is_equal_to(len(responses) + 2)
352
353     @staticmethod
354     def _create_backend():
355         backend = DynamicBackend()
356         backend.id = "22"
357         backend.soa = "MY_SOA"
358         backend.ip_address = "127.0.0.33"
359         backend.ttl = "200"
360         backend.name_servers = collections.OrderedDict([
361             ("ns1.lcl.io", "127.0.0.34"),
362             ("ns2.lcl.io", "127.0.0.35"),
363         ])
364         backend.domain = "lcl.io"
365         backend.blacklisted_ips = ["127.0.0.2"]
366         return backend
367
368     def _configure_backend(self, filename="backend_test.conf"):
369         backend = DynamicBackend()
370         backend._get_config_filename = lambda: self._get_test_config_filename(filename)
371         backend.configure()
372         return backend
373
374     def _get_test_config_filename(self, filename):
375         return os.path.join(os.path.dirname(os.path.realpath(__file__)), filename)
376