-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
120 lines (90 loc) · 4.04 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import argparse
from ast import arguments
from ipaddress import AddressValueError, IPv4Address
import logging
from operator import attrgetter, itemgetter
from typing import *
import yaml
import dhcp
from dns_handler import DNSServer
from yaml_helper import YamlHelper
def ipaddrlist_tostr(addresses: List[IPv4Address], joiner: str = ', ') -> str:
return joiner.join(map(attrgetter('exploded'), addresses))
def get_dhcp_info(yaml_conf: YamlHelper) -> Dict[str, Any]:
"""
Parse the YAML configuration object to determine which DNS servers to use
as fallback
"""
if yaml_conf.get_bool("dhcp-options/use-dhcp"):
return dhcp.get_dhcp_options(IPv4Address("255.255.255.255"), yaml_conf.get("dhcp-options/interface"))
else:
return {
'local_dns': [],
'local_domain': None,
'local_dhcp': None
}
def get_resolvers(yaml_conf: YamlHelper) -> Dict[bytes, List[IPv4Address]]:
"""
Get the configuration for the resolvers for specific domains
"""
# parse resolvers
lookup_servers = {}
if not yaml_conf:
return lookup_servers
for entry in yaml_conf.get_list('resolvers'):
domain, server_list = list(entry.items())[0]
if not isinstance(server_list, list):
server_list = [server_list]
if not domain.endswith('.'):
domain += "."
servers = lookup_servers.setdefault(domain.encode(), list())
for server in server_list:
try:
# if specified, use the fallback server
if server.casefold() == "fallback".casefold():
server = server.lower()
else:
# otherwise just add the server to the list
server = IPv4Address(server)
servers.append(server)
except AddressValueError:
logging.error(f"While getting resolvers, invalid IP address: {server}")
continue
# remove domains with no valid lookup servers
# empty lists are considered False, so we can just use itemgetter
return dict(filter(itemgetter(1), lookup_servers.items()))
def get_dns_server(yaml_conf: YamlHelper) -> DNSServer:
"""
Configure and return a DNSServer instance to use
"""
lookup_servers = get_resolvers(yaml_conf)
# Add the local domain on to the list of resolvers if there is one configured
dhcp_info = get_dhcp_info(yaml_conf)
if dhcp_info['local_domain']:
lookup_servers[dhcp_info['local_domain'] + b'.'] = dhcp_info['local_dns']
logging.info("Lookup table configured as such:\n" + '\n'.join(map(lambda each: f"\t- {each[0].decode()}: {ipaddrlist_tostr(each[1])}", lookup_servers.items())))
# Use fallback servers if provided in config.yml, otherwise use dhcp_info
fallback_servers = list(map(IPv4Address, yaml_conf.get_list("fallbacks") or dhcp_info['local_dns']))
logging.info(f"Fallback servers configured as such: [{ipaddrlist_tostr(fallback_servers)}]")
try:
server_address = yaml_conf.get_as("dns-server/ip", IPv4Address)
except AddressValueError:
logging.error("Invalid IP address specified for the DNS server!")
raise
return DNSServer(server_address, fallback_servers=fallback_servers, lookup_servers=lookup_servers)
def parse_arguments(arguments=None):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", help="Activate more verbose logging", action="store_true", dest="verbose", default=False)
return parser.parse_args(arguments)
def main():
with open("config.yml") as file:
yaml_conf = YamlHelper(yaml.safe_load(file))
dns_server = get_dns_server(yaml_conf)
logging.info("Starting DNS server...")
dns_server.serve_forever()
logging.info("DNS Server stopped.")
if __name__ == "__main__":
arguments = parse_arguments()
log_level = logging.DEBUG if arguments.verbose else logging.INFO
logging.basicConfig(level=log_level, format="[%(asctime)s %(levelname)s]: %(message)s", datefmt="%H:%M")
main()