Commit 8fd98d7d authored by Joshua Tauberer's avatar Joshua Tauberer

status checks: s/env['out']/output/

parent 1039a08b
...@@ -18,13 +18,11 @@ from mailconfig import get_mail_domains, get_mail_aliases ...@@ -18,13 +18,11 @@ from mailconfig import get_mail_domains, get_mail_aliases
from utils import shell, sort_domains, load_env_vars_from_file from utils import shell, sort_domains, load_env_vars_from_file
def run_checks(env, output): def run_checks(env, output):
env["out"] = output
# run systems checks # run systems checks
env["out"].add_heading("System") output.add_heading("System")
# check that services are running # check that services are running
if not run_services_checks(env): if not run_services_checks(env, output):
# If critical services are not running, stop. If bind9 isn't running, # If critical services are not running, stop. If bind9 isn't running,
# all later DNS checks will timeout and that will take forever to # all later DNS checks will timeout and that will take forever to
# go through, and if running over the web will cause a fastcgi timeout. # go through, and if running over the web will cause a fastcgi timeout.
...@@ -35,13 +33,13 @@ def run_checks(env, output): ...@@ -35,13 +33,13 @@ def run_checks(env, output):
# that in run_services checks.) # that in run_services checks.)
shell('check_call', ["/usr/sbin/rndc", "flush"], trap=True) shell('check_call', ["/usr/sbin/rndc", "flush"], trap=True)
run_system_checks(env) run_system_checks(env, output)
# perform other checks # perform other checks
run_network_checks(env) run_network_checks(env, output)
run_domain_checks(env) run_domain_checks(env, output)
def run_services_checks(env): def run_services_checks(env, output):
# Check that system services are running. # Check that system services are running.
import socket import socket
...@@ -78,11 +76,11 @@ def run_services_checks(env): ...@@ -78,11 +76,11 @@ def run_services_checks(env):
"127.0.0.1" if not service["public"] else env['PUBLIC_IP'], "127.0.0.1" if not service["public"] else env['PUBLIC_IP'],
service["port"])) service["port"]))
except OSError as e: except OSError as e:
env['out'].print_error("%s is not running (%s)." % (service['name'], str(e))) output.print_error("%s is not running (%s)." % (service['name'], str(e)))
# Why is nginx not running? # Why is nginx not running?
if service["port"] in (80, 443): if service["port"] in (80, 443):
env['out'].print_line(shell('check_output', ['nginx', '-t'], capture_stderr=True, trap=True)[1].strip()) output.print_line(shell('check_output', ['nginx', '-t'], capture_stderr=True, trap=True)[1].strip())
# Flag if local DNS is not running. # Flag if local DNS is not running.
if service["port"] == 53 and service["public"] == False: if service["port"] == 53 and service["public"] == False:
...@@ -92,17 +90,17 @@ def run_services_checks(env): ...@@ -92,17 +90,17 @@ def run_services_checks(env):
s.close() s.close()
if ok: if ok:
env['out'].print_ok("All system services are running.") output.print_ok("All system services are running.")
return ok return ok
def run_system_checks(env): def run_system_checks(env, output):
check_ssh_password(env) check_ssh_password(env, output)
check_software_updates(env) check_software_updates(env, output)
check_system_aliases(env) check_system_aliases(env, output)
check_free_disk_space(env) check_free_disk_space(env, output)
def check_ssh_password(env): def check_ssh_password(env, output):
# Check that SSH login with password is disabled. The openssh-server # Check that SSH login with password is disabled. The openssh-server
# package may not be installed so check that before trying to access # package may not be installed so check that before trying to access
# the configuration file. # the configuration file.
...@@ -111,57 +109,56 @@ def check_ssh_password(env): ...@@ -111,57 +109,56 @@ def check_ssh_password(env):
sshd = open("/etc/ssh/sshd_config").read() sshd = open("/etc/ssh/sshd_config").read()
if re.search("\nPasswordAuthentication\s+yes", sshd) \ if re.search("\nPasswordAuthentication\s+yes", sshd) \
or not re.search("\nPasswordAuthentication\s+no", sshd): or not re.search("\nPasswordAuthentication\s+no", sshd):
env['out'].print_error("""The SSH server on this machine permits password-based login. A more secure output.print_error("""The SSH server on this machine permits password-based login. A more secure
way to log in is using a public key. Add your SSH public key to $HOME/.ssh/authorized_keys, check way to log in is using a public key. Add your SSH public key to $HOME/.ssh/authorized_keys, check
that you can log in without a password, set the option 'PasswordAuthentication no' in that you can log in without a password, set the option 'PasswordAuthentication no' in
/etc/ssh/sshd_config, and then restart the openssh via 'sudo service ssh restart'.""") /etc/ssh/sshd_config, and then restart the openssh via 'sudo service ssh restart'.""")
else: else:
env['out'].print_ok("SSH disallows password-based login.") output.print_ok("SSH disallows password-based login.")
def check_software_updates(env): def check_software_updates(env, output):
# Check for any software package updates. # Check for any software package updates.
pkgs = list_apt_updates(apt_update=False) pkgs = list_apt_updates(apt_update=False)
if os.path.exists("/var/run/reboot-required"): if os.path.exists("/var/run/reboot-required"):
env['out'].print_error("System updates have been installed and a reboot of the machine is required.") output.print_error("System updates have been installed and a reboot of the machine is required.")
elif len(pkgs) == 0: elif len(pkgs) == 0:
env['out'].print_ok("System software is up to date.") output.print_ok("System software is up to date.")
else: else:
env['out'].print_error("There are %d software packages that can be updated." % len(pkgs)) output.print_error("There are %d software packages that can be updated." % len(pkgs))
for p in pkgs: for p in pkgs:
env['out'].print_line("%s (%s)" % (p["package"], p["version"])) output.print_line("%s (%s)" % (p["package"], p["version"]))
def check_system_aliases(env): def check_system_aliases(env, output):
# Check that the administrator alias exists since that's where all # Check that the administrator alias exists since that's where all
# admin email is automatically directed. # admin email is automatically directed.
check_alias_exists("administrator@" + env['PRIMARY_HOSTNAME'], env) check_alias_exists("administrator@" + env['PRIMARY_HOSTNAME'], env, output)
def check_free_disk_space(env): def check_free_disk_space(env, output):
# Check free disk space. # Check free disk space.
st = os.statvfs(env['STORAGE_ROOT']) st = os.statvfs(env['STORAGE_ROOT'])
bytes_total = st.f_blocks * st.f_frsize bytes_total = st.f_blocks * st.f_frsize
bytes_free = st.f_bavail * st.f_frsize bytes_free = st.f_bavail * st.f_frsize
disk_msg = "The disk has %s GB space remaining." % str(round(bytes_free/1024.0/1024.0/1024.0*10.0)/10.0) disk_msg = "The disk has %s GB space remaining." % str(round(bytes_free/1024.0/1024.0/1024.0*10.0)/10.0)
if bytes_free > .3 * bytes_total: if bytes_free > .3 * bytes_total:
env['out'].print_ok(disk_msg) output.print_ok(disk_msg)
elif bytes_free > .15 * bytes_total: elif bytes_free > .15 * bytes_total:
env['out'].print_warning(disk_msg) output.print_warning(disk_msg)
else: else:
env['out'].print_error(disk_msg) output.print_error(disk_msg)
def run_network_checks(env): def run_network_checks(env, output):
# Also see setup/network-checks.sh. # Also see setup/network-checks.sh.
env["out"].add_heading("Network") output.add_heading("Network")
# Stop if we cannot make an outbound connection on port 25. Many residential # Stop if we cannot make an outbound connection on port 25. Many residential
# networks block outbound port 25 to prevent their network from sending spam. # networks block outbound port 25 to prevent their network from sending spam.
# See if we can reach one of Google's MTAs with a 5-second timeout. # See if we can reach one of Google's MTAs with a 5-second timeout.
code, ret = shell("check_call", ["/bin/nc", "-z", "-w5", "aspmx.l.google.com", "25"], trap=True) code, ret = shell("check_call", ["/bin/nc", "-z", "-w5", "aspmx.l.google.com", "25"], trap=True)
if ret == 0: if ret == 0:
env['out'].print_ok("Outbound mail (SMTP port 25) is not blocked.") output.print_ok("Outbound mail (SMTP port 25) is not blocked.")
else: else:
env['out'].print_error("""Outbound mail (SMTP port 25) seems to be blocked by your network. You output.print_error("""Outbound mail (SMTP port 25) seems to be blocked by your network. You
will not be able to send any mail. Many residential networks block port 25 to prevent hijacked will not be able to send any mail. Many residential networks block port 25 to prevent hijacked
machines from being able to send spam. A quick connection test to Google's mail server on port 25 machines from being able to send spam. A quick connection test to Google's mail server on port 25
failed.""") failed.""")
...@@ -173,13 +170,13 @@ def run_network_checks(env): ...@@ -173,13 +170,13 @@ def run_network_checks(env):
rev_ip4 = ".".join(reversed(env['PUBLIC_IP'].split('.'))) rev_ip4 = ".".join(reversed(env['PUBLIC_IP'].split('.')))
zen = query_dns(rev_ip4+'.zen.spamhaus.org', 'A', nxdomain=None) zen = query_dns(rev_ip4+'.zen.spamhaus.org', 'A', nxdomain=None)
if zen is None: if zen is None:
env['out'].print_ok("IP address is not blacklisted by zen.spamhaus.org.") output.print_ok("IP address is not blacklisted by zen.spamhaus.org.")
else: else:
env['out'].print_error("""The IP address of this machine %s is listed in the Spamhaus Block List (code %s), output.print_error("""The IP address of this machine %s is listed in the Spamhaus Block List (code %s),
which may prevent recipients from receiving your email. See http://www.spamhaus.org/query/ip/%s.""" which may prevent recipients from receiving your email. See http://www.spamhaus.org/query/ip/%s."""
% (env['PUBLIC_IP'], zen, env['PUBLIC_IP'])) % (env['PUBLIC_IP'], zen, env['PUBLIC_IP']))
def run_domain_checks(env): def run_domain_checks(env, output):
# Get the list of domains we handle mail for. # Get the list of domains we handle mail for.
mail_domains = get_mail_domains(env) mail_domains = get_mail_domains(env)
...@@ -192,29 +189,29 @@ def run_domain_checks(env): ...@@ -192,29 +189,29 @@ def run_domain_checks(env):
# Check the domains. # Check the domains.
for domain in sort_domains(mail_domains | dns_domains | web_domains, env): for domain in sort_domains(mail_domains | dns_domains | web_domains, env):
env["out"].add_heading(domain) output.add_heading(domain)
if domain == env["PRIMARY_HOSTNAME"]: if domain == env["PRIMARY_HOSTNAME"]:
check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles) check_primary_hostname_dns(domain, env, output, dns_domains, dns_zonefiles)
if domain in dns_domains: if domain in dns_domains:
check_dns_zone(domain, env, dns_zonefiles) check_dns_zone(domain, env, output, dns_zonefiles)
if domain in mail_domains: if domain in mail_domains:
check_mail_domain(domain, env) check_mail_domain(domain, env, output)
if domain in web_domains: if domain in web_domains:
check_web_domain(domain, env) check_web_domain(domain, env, output)
if domain in dns_domains: if domain in dns_domains:
check_dns_zone_suggestions(domain, env, dns_zonefiles) check_dns_zone_suggestions(domain, env, output, dns_zonefiles)
def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles): def check_primary_hostname_dns(domain, env, output, dns_domains, dns_zonefiles):
# If a DS record is set on the zone containing this domain, check DNSSEC now. # If a DS record is set on the zone containing this domain, check DNSSEC now.
for zone in dns_domains: for zone in dns_domains:
if zone == domain or domain.endswith("." + zone): if zone == domain or domain.endswith("." + zone):
if query_dns(zone, "DS", nxdomain=None) is not None: if query_dns(zone, "DS", nxdomain=None) is not None:
check_dnssec(zone, env, dns_zonefiles, is_checking_primary=True) check_dnssec(zone, env, output, dns_zonefiles, is_checking_primary=True)
# Check that the ns1/ns2 hostnames resolve to A records. This information probably # Check that the ns1/ns2 hostnames resolve to A records. This information probably
# comes from the TLD since the information is set at the registrar as glue records. # comes from the TLD since the information is set at the registrar as glue records.
...@@ -223,9 +220,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles): ...@@ -223,9 +220,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles):
# will probably fail. # will probably fail.
ip = query_dns("ns1." + domain, "A") + '/' + query_dns("ns2." + domain, "A") ip = query_dns("ns1." + domain, "A") + '/' + query_dns("ns2." + domain, "A")
if ip == env['PUBLIC_IP'] + '/' + env['PUBLIC_IP']: if ip == env['PUBLIC_IP'] + '/' + env['PUBLIC_IP']:
env['out'].print_ok("Nameserver glue records are correct at registrar. [ns1/ns2.%s => %s]" % (env['PRIMARY_HOSTNAME'], env['PUBLIC_IP'])) output.print_ok("Nameserver glue records are correct at registrar. [ns1/ns2.%s => %s]" % (env['PRIMARY_HOSTNAME'], env['PUBLIC_IP']))
else: else:
env['out'].print_error("""Nameserver glue records are incorrect. The ns1.%s and ns2.%s nameservers must be configured at your domain name output.print_error("""Nameserver glue records are incorrect. The ns1.%s and ns2.%s nameservers must be configured at your domain name
registrar as having the IP address %s. They currently report addresses of %s. It may take several hours for registrar as having the IP address %s. They currently report addresses of %s. It may take several hours for
public DNS to update after a change.""" public DNS to update after a change."""
% (env['PRIMARY_HOSTNAME'], env['PRIMARY_HOSTNAME'], env['PUBLIC_IP'], ip)) % (env['PRIMARY_HOSTNAME'], env['PRIMARY_HOSTNAME'], env['PUBLIC_IP'], ip))
...@@ -233,9 +230,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles): ...@@ -233,9 +230,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles):
# Check that PRIMARY_HOSTNAME resolves to PUBLIC_IP in public DNS. # Check that PRIMARY_HOSTNAME resolves to PUBLIC_IP in public DNS.
ip = query_dns(domain, "A") ip = query_dns(domain, "A")
if ip == env['PUBLIC_IP']: if ip == env['PUBLIC_IP']:
env['out'].print_ok("Domain resolves to box's IP address. [%s => %s]" % (env['PRIMARY_HOSTNAME'], env['PUBLIC_IP'])) output.print_ok("Domain resolves to box's IP address. [%s => %s]" % (env['PRIMARY_HOSTNAME'], env['PUBLIC_IP']))
else: else:
env['out'].print_error("""This domain must resolve to your box's IP address (%s) in public DNS but it currently resolves output.print_error("""This domain must resolve to your box's IP address (%s) in public DNS but it currently resolves
to %s. It may take several hours for public DNS to update after a change. This problem may result from other to %s. It may take several hours for public DNS to update after a change. This problem may result from other
issues listed here.""" issues listed here."""
% (env['PUBLIC_IP'], ip)) % (env['PUBLIC_IP'], ip))
...@@ -245,9 +242,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles): ...@@ -245,9 +242,9 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles):
ipaddr_rev = dns.reversename.from_address(env['PUBLIC_IP']) ipaddr_rev = dns.reversename.from_address(env['PUBLIC_IP'])
existing_rdns = query_dns(ipaddr_rev, "PTR") existing_rdns = query_dns(ipaddr_rev, "PTR")
if existing_rdns == domain: if existing_rdns == domain:
env['out'].print_ok("Reverse DNS is set correctly at ISP. [%s => %s]" % (env['PUBLIC_IP'], env['PRIMARY_HOSTNAME'])) output.print_ok("Reverse DNS is set correctly at ISP. [%s => %s]" % (env['PUBLIC_IP'], env['PRIMARY_HOSTNAME']))
else: else:
env['out'].print_error("""Your box's reverse DNS is currently %s, but it should be %s. Your ISP or cloud provider will have instructions output.print_error("""Your box's reverse DNS is currently %s, but it should be %s. Your ISP or cloud provider will have instructions
on setting up reverse DNS for your box at %s.""" % (existing_rdns, domain, env['PUBLIC_IP']) ) on setting up reverse DNS for your box at %s.""" % (existing_rdns, domain, env['PUBLIC_IP']) )
# Check the TLSA record. # Check the TLSA record.
...@@ -255,29 +252,29 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles): ...@@ -255,29 +252,29 @@ def check_primary_hostname_dns(domain, env, dns_domains, dns_zonefiles):
tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None) tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None)
tlsa25_expected = build_tlsa_record(env) tlsa25_expected = build_tlsa_record(env)
if tlsa25 == tlsa25_expected: if tlsa25 == tlsa25_expected:
env['out'].print_ok("""The DANE TLSA record for incoming mail is correct (%s).""" % tlsa_qname,) output.print_ok("""The DANE TLSA record for incoming mail is correct (%s).""" % tlsa_qname,)
elif tlsa25 is None: elif tlsa25 is None:
env['out'].print_error("""The DANE TLSA record for incoming mail is not set. This is optional.""") output.print_error("""The DANE TLSA record for incoming mail is not set. This is optional.""")
else: else:
env['out'].print_error("""The DANE TLSA record for incoming mail (%s) is not correct. It is '%s' but it should be '%s'. output.print_error("""The DANE TLSA record for incoming mail (%s) is not correct. It is '%s' but it should be '%s'.
It may take several hours for public DNS to update after a change.""" It may take several hours for public DNS to update after a change."""
% (tlsa_qname, tlsa25, tlsa25_expected)) % (tlsa_qname, tlsa25, tlsa25_expected))
# Check that the hostmaster@ email address exists. # Check that the hostmaster@ email address exists.
check_alias_exists("hostmaster@" + domain, env) check_alias_exists("hostmaster@" + domain, env, output)
def check_alias_exists(alias, env): def check_alias_exists(alias, env, output):
mail_alises = dict(get_mail_aliases(env)) mail_alises = dict(get_mail_aliases(env))
if alias in mail_alises: if alias in mail_alises:
env['out'].print_ok("%s exists as a mail alias [=> %s]" % (alias, mail_alises[alias])) output.print_ok("%s exists as a mail alias [=> %s]" % (alias, mail_alises[alias]))
else: else:
env['out'].print_error("""You must add a mail alias for %s and direct email to you or another administrator.""" % alias) output.print_error("""You must add a mail alias for %s and direct email to you or another administrator.""" % alias)
def check_dns_zone(domain, env, dns_zonefiles): def check_dns_zone(domain, env, output, dns_zonefiles):
# If a DS record is set at the registrar, check DNSSEC first because it will affect the NS query. # If a DS record is set at the registrar, check DNSSEC first because it will affect the NS query.
# If it is not set, we suggest it last. # If it is not set, we suggest it last.
if query_dns(domain, "DS", nxdomain=None) is not None: if query_dns(domain, "DS", nxdomain=None) is not None:
check_dnssec(domain, env, dns_zonefiles) check_dnssec(domain, env, output, dns_zonefiles)
# We provide a DNS zone for the domain. It should have NS records set up # We provide a DNS zone for the domain. It should have NS records set up
# at the domain name's registrar pointing to this box. The secondary DNS # at the domain name's registrar pointing to this box. The secondary DNS
...@@ -292,20 +289,20 @@ def check_dns_zone(domain, env, dns_zonefiles): ...@@ -292,20 +289,20 @@ def check_dns_zone(domain, env, dns_zonefiles):
custom_dns.get("_secondary_nameserver", "ns2." + env['PRIMARY_HOSTNAME']), custom_dns.get("_secondary_nameserver", "ns2." + env['PRIMARY_HOSTNAME']),
])) ]))
if existing_ns.lower() == correct_ns.lower(): if existing_ns.lower() == correct_ns.lower():
env['out'].print_ok("Nameservers are set correctly at registrar. [%s]" % correct_ns) output.print_ok("Nameservers are set correctly at registrar. [%s]" % correct_ns)
else: else:
env['out'].print_error("""The nameservers set on this domain are incorrect. They are currently %s. Use your domain name registrar's output.print_error("""The nameservers set on this domain are incorrect. They are currently %s. Use your domain name registrar's
control panel to set the nameservers to %s.""" control panel to set the nameservers to %s."""
% (existing_ns, correct_ns) ) % (existing_ns, correct_ns) )
def check_dns_zone_suggestions(domain, env, dns_zonefiles): def check_dns_zone_suggestions(domain, env, output, dns_zonefiles):
# Since DNSSEC is optional, if a DS record is NOT set at the registrar suggest it. # Since DNSSEC is optional, if a DS record is NOT set at the registrar suggest it.
# (If it was set, we did the check earlier.) # (If it was set, we did the check earlier.)
if query_dns(domain, "DS", nxdomain=None) is None: if query_dns(domain, "DS", nxdomain=None) is None:
check_dnssec(domain, env, dns_zonefiles) check_dnssec(domain, env, output, dns_zonefiles)
def check_dnssec(domain, env, dns_zonefiles, is_checking_primary=False): def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False):
# See if the domain has a DS record set at the registrar. The DS record may have # See if the domain has a DS record set at the registrar. The DS record may have
# several forms. We have to be prepared to check for any valid record. We've # several forms. We have to be prepared to check for any valid record. We've
# pre-generated all of the valid digests --- read them in. # pre-generated all of the valid digests --- read them in.
...@@ -327,54 +324,54 @@ def check_dnssec(domain, env, dns_zonefiles, is_checking_primary=False): ...@@ -327,54 +324,54 @@ def check_dnssec(domain, env, dns_zonefiles, is_checking_primary=False):
if ds_looks_valid: ds = ds.split(" ") if ds_looks_valid: ds = ds.split(" ")
if ds_looks_valid and ds[0] == ds_keytag and ds[1] == ds_alg and ds[3] == digests.get(ds[2]): if ds_looks_valid and ds[0] == ds_keytag and ds[1] == ds_alg and ds[3] == digests.get(ds[2]):
if is_checking_primary: return if is_checking_primary: return
env['out'].print_ok("DNSSEC 'DS' record is set correctly at registrar.") output.print_ok("DNSSEC 'DS' record is set correctly at registrar.")
else: else:
if ds == None: if ds == None:
if is_checking_primary: return if is_checking_primary: return
env['out'].print_error("""This domain's DNSSEC DS record is not set. The DS record is optional. The DS record activates DNSSEC. output.print_error("""This domain's DNSSEC DS record is not set. The DS record is optional. The DS record activates DNSSEC.
To set a DS record, you must follow the instructions provided by your domain name registrar and provide to them this information:""") To set a DS record, you must follow the instructions provided by your domain name registrar and provide to them this information:""")
else: else:
if is_checking_primary: if is_checking_primary:
env['out'].print_error("""The DNSSEC 'DS' record for %s is incorrect. See further details below.""" % domain) output.print_error("""The DNSSEC 'DS' record for %s is incorrect. See further details below.""" % domain)
return return
env['out'].print_error("""This domain's DNSSEC DS record is incorrect. The chain of trust is broken between the public DNS system output.print_error("""This domain's DNSSEC DS record is incorrect. The chain of trust is broken between the public DNS system
and this machine's DNS server. It may take several hours for public DNS to update after a change. If you did not recently and this machine's DNS server. It may take several hours for public DNS to update after a change. If you did not recently
make a change, you must resolve this immediately by following the instructions provided by your domain name registrar and make a change, you must resolve this immediately by following the instructions provided by your domain name registrar and
provide to them this information:""") provide to them this information:""")
env['out'].print_line("") output.print_line("")
env['out'].print_line("Key Tag: " + ds_keytag + ("" if not ds_looks_valid or ds[0] == ds_keytag else " (Got '%s')" % ds[0])) output.print_line("Key Tag: " + ds_keytag + ("" if not ds_looks_valid or ds[0] == ds_keytag else " (Got '%s')" % ds[0]))
env['out'].print_line("Key Flags: KSK") output.print_line("Key Flags: KSK")
env['out'].print_line( output.print_line(
("Algorithm: %s / %s" % (ds_alg, alg_name_map[ds_alg])) ("Algorithm: %s / %s" % (ds_alg, alg_name_map[ds_alg]))
+ ("" if not ds_looks_valid or ds[1] == ds_alg else " (Got '%s')" % ds[1])) + ("" if not ds_looks_valid or ds[1] == ds_alg else " (Got '%s')" % ds[1]))
# see http://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml # see http://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml
env['out'].print_line("Digest Type: 2 / SHA-256") output.print_line("Digest Type: 2 / SHA-256")
# http://www.ietf.org/assignments/ds-rr-types/ds-rr-types.xml # http://www.ietf.org/assignments/ds-rr-types/ds-rr-types.xml
env['out'].print_line("Digest: " + digests['2']) output.print_line("Digest: " + digests['2'])
if ds_looks_valid and ds[3] != digests.get(ds[2]): if ds_looks_valid and ds[3] != digests.get(ds[2]):
env['out'].print_line("(Got digest type %s and digest %s which do not match.)" % (ds[2], ds[3])) output.print_line("(Got digest type %s and digest %s which do not match.)" % (ds[2], ds[3]))
env['out'].print_line("Public Key: ") output.print_line("Public Key: ")
env['out'].print_line(dnsssec_pubkey, monospace=True) output.print_line(dnsssec_pubkey, monospace=True)
env['out'].print_line("") output.print_line("")
env['out'].print_line("Bulk/Record Format:") output.print_line("Bulk/Record Format:")
env['out'].print_line("" + ds_correct[0]) output.print_line("" + ds_correct[0])
env['out'].print_line("") output.print_line("")
def check_mail_domain(domain, env): def check_mail_domain(domain, env, output):
# Check the MX record. # Check the MX record.
mx = query_dns(domain, "MX", nxdomain=None) mx = query_dns(domain, "MX", nxdomain=None)
expected_mx = "10 " + env['PRIMARY_HOSTNAME'] expected_mx = "10 " + env['PRIMARY_HOSTNAME']
if mx == expected_mx: if mx == expected_mx:
env['out'].print_ok("Domain's email is directed to this domain. [%s => %s]" % (domain, mx)) output.print_ok("Domain's email is directed to this domain. [%s => %s]" % (domain, mx))
elif mx == None: elif mx == None:
# A missing MX record is okay on the primary hostname because # A missing MX record is okay on the primary hostname because
# the primary hostname's A record (the MX fallback) is... itself, # the primary hostname's A record (the MX fallback) is... itself,
# which is what we want the MX to be. # which is what we want the MX to be.
if domain == env['PRIMARY_HOSTNAME']: if domain == env['PRIMARY_HOSTNAME']:
env['out'].print_ok("Domain's email is directed to this domain. [%s has no MX record, which is ok]" % (domain,)) output.print_ok("Domain's email is directed to this domain. [%s has no MX record, which is ok]" % (domain,))
# And a missing MX record is okay on other domains if the A record # And a missing MX record is okay on other domains if the A record
# matches the A record of the PRIMARY_HOSTNAME. Actually this will # matches the A record of the PRIMARY_HOSTNAME. Actually this will
...@@ -383,48 +380,48 @@ def check_mail_domain(domain, env): ...@@ -383,48 +380,48 @@ def check_mail_domain(domain, env):
domain_a = query_dns(domain, "A", nxdomain=None) domain_a = query_dns(domain, "A", nxdomain=None)
primary_a = query_dns(env['PRIMARY_HOSTNAME'], "A", nxdomain=None) primary_a = query_dns(env['PRIMARY_HOSTNAME'], "A", nxdomain=None)
if domain_a != None and domain_a == primary_a: if domain_a != None and domain_a == primary_a:
env['out'].print_ok("Domain's email is directed to this domain. [%s has no MX record but its A record is OK]" % (domain,)) output.print_ok("Domain's email is directed to this domain. [%s has no MX record but its A record is OK]" % (domain,))
else: else:
env['out'].print_error("""This domain's DNS MX record is not set. It should be '%s'. Mail will not output.print_error("""This domain's DNS MX record is not set. It should be '%s'. Mail will not
be delivered to this box. It may take several hours for public DNS to update after a be delivered to this box. It may take several hours for public DNS to update after a
change. This problem may result from other issues listed here.""" % (expected_mx,)) change. This problem may result from other issues listed here.""" % (expected_mx,))
else: else:
env['out'].print_error("""This domain's DNS MX record is incorrect. It is currently set to '%s' but should be '%s'. Mail will not output.print_error("""This domain's DNS MX record is incorrect. It is currently set to '%s' but should be '%s'. Mail will not
be delivered to this box. It may take several hours for public DNS to update after a change. This problem may result from be delivered to this box. It may take several hours for public DNS to update after a change. This problem may result from
other issues listed here.""" % (mx, expected_mx)) other issues listed here.""" % (mx, expected_mx))
# Check that the postmaster@ email address exists. # Check that the postmaster@ email address exists.
check_alias_exists("postmaster@" + domain, env) check_alias_exists("postmaster@" + domain, env, output)
# Stop if the domain is listed in the Spamhaus Domain Block List. # Stop if the domain is listed in the Spamhaus Domain Block List.
# The user might have chosen a domain that was previously in use by a spammer # The user might have chosen a domain that was previously in use by a spammer
# and will not be able to reliably send mail. # and will not be able to reliably send mail.
dbl = query_dns(domain+'.dbl.spamhaus.org', "A", nxdomain=None) dbl = query_dns(domain+'.dbl.spamhaus.org', "A", nxdomain=None)
if dbl is None: if dbl is None:
env['out'].print_ok("Domain is not blacklisted by dbl.spamhaus.org.") output.print_ok("Domain is not blacklisted by dbl.spamhaus.org.")
else: else:
env['out'].print_error("""This domain is listed in the Spamhaus Domain Block List (code %s), output.print_error("""This domain is listed in the Spamhaus Domain Block List (code %s),
which may prevent recipients from receiving your mail. which may prevent recipients from receiving your mail.
See http://www.spamhaus.org/dbl/ and http://www.spamhaus.org/query/domain/%s.""" % (dbl, domain)) See http://www.spamhaus.org/dbl/ and http://www.spamhaus.org/query/domain/%s.""" % (dbl, domain))
def check_web_domain(domain, env): def check_web_domain(domain, env, output):
# See if the domain's A record resolves to our PUBLIC_IP. This is already checked # See if the domain's A record resolves to our PUBLIC_IP. This is already checked
# for PRIMARY_HOSTNAME, for which it is required for mail specifically. For it and # for PRIMARY_HOSTNAME, for which it is required for mail specifically. For it and
# other domains, it is required to access its website. # other domains, it is required to access its website.
if domain != env['PRIMARY_HOSTNAME']: if domain != env['PRIMARY_HOSTNAME']:
ip = query_dns(domain, "A") ip = query_dns(domain, "A")
if ip == env['PUBLIC_IP']: if ip == env['PUBLIC_IP']:
env['out'].print_ok("Domain resolves to this box's IP address. [%s => %s]" % (domain, env['PUBLIC_IP'])) output.print_ok("Domain resolves to this box's IP address. [%s => %s]" % (domain, env['PUBLIC_IP']))
else: else:
env['out'].print_error("""This domain should resolve to your box's IP address (%s) if you would like the box to serve output.print_error("""This domain should resolve to your box's IP address (%s) if you would like the box to serve
webmail or a website on this domain. The domain currently resolves to %s in public DNS. It may take several hours for webmail or a website on this domain. The domain currently resolves to %s in public DNS. It may take several hours for
public DNS to update after a change. This problem may result from other issues listed here.""" % (env['PUBLIC_IP'], ip)) public DNS to update after a change. This problem may result from other issues listed here.""" % (env['PUBLIC_IP'], ip))
# We need a SSL certificate for PRIMARY_HOSTNAME because that's where the # We need a SSL certificate for PRIMARY_HOSTNAME because that's where the
# user will log in with IMAP or webmail. Any other domain we serve a # user will log in with IMAP or webmail. Any other domain we serve a
# website for also needs a signed certificate. # website for also needs a signed certificate.
check_ssl_cert(domain, env) check_ssl_cert(domain, env, output)
def query_dns(qname, rtype, nxdomain='[Not Set]'): def query_dns(qname, rtype, nxdomain='[Not Set]'):
# Make the qname absolute by appending a period. Without this, dns.resolver.query # Make the qname absolute by appending a period. Without this, dns.resolver.query
...@@ -451,7 +448,7 @@ def query_dns(qname, rtype, nxdomain='[Not Set]'): ...@@ -451,7 +448,7 @@ def query_dns(qname, rtype, nxdomain='[Not Set]'):
# can compare to a well known order. # can compare to a well known order.
return "; ".join(sorted(str(r).rstrip('.') for r in response)) return "; ".join(sorted(str(r).rstrip('.') for r in response))
def check_ssl_cert(domain, env): def check_ssl_cert(domain, env, output):
# Check that SSL certificate is signed. # Check that SSL certificate is signed.
# Skip the check if the A record is not pointed here. # Skip the check if the A record is not pointed here.
...@@ -461,7 +458,7 @@ def check_ssl_cert(domain, env): ...@@ -461,7 +458,7 @@ def check_ssl_cert(domain, env):
ssl_key, ssl_certificate = get_domain_ssl_files(domain, env) ssl_key, ssl_certificate = get_domain_ssl_files(domain, env)
if not os.path.exists(ssl_certificate): if not os.path.exists(ssl_certificate):
env['out'].print_error("The SSL certificate file for this domain is missing.") output.print_error("The SSL certificate file for this domain is missing.")
return return
# Check that the certificate is good. # Check that the certificate is good.
...@@ -470,7 +467,7 @@ def check_ssl_cert(domain, env): ...@@ -470,7 +467,7 @@ def check_ssl_cert(domain, env):
if cert_status == "OK": if cert_status == "OK":
# The certificate is ok. The details has expiry info. # The certificate is ok. The details has expiry info.
env['out'].print_ok("SSL certificate is signed & valid. " + cert_status_details) output.print_ok("SSL certificate is signed & valid. " + cert_status_details)
elif cert_status == "SELF-SIGNED": elif cert_status == "SELF-SIGNED":
# Offer instructions for purchasing a signed certificate. # Offer instructions for purchasing a signed certificate.
...@@ -485,25 +482,25 @@ def check_ssl_cert(domain, env): ...@@ -485,25 +482,25 @@ def check_ssl_cert(domain, env):
fingerprint = re.sub(".*Fingerprint=", "", fingerprint).strip() fingerprint = re.sub(".*Fingerprint=", "", fingerprint).strip()
if domain == env['PRIMARY_HOSTNAME']: if domain == env['PRIMARY_HOSTNAME']:
env['out'].print_error("""The SSL certificate for this domain is currently self-signed. You will get a security output.print_error("""The SSL certificate for this domain is currently self-signed. You will get a security
warning when you check or send email and when visiting this domain in a web browser (for webmail or warning when you check or send email and when visiting this domain in a web browser (for webmail or
static site hosting). Use the SSL Certificates page in this control panel to install a signed SSL certificate. static site hosting). Use the SSL Certificates page in this control panel to install a signed SSL certificate.
You may choose to leave the self-signed certificate in place and confirm the security exception, but check that You may choose to leave the self-signed certificate in place and confirm the security exception, but check that
the certificate fingerprint matches the following:""") the certificate fingerprint matches the following:""")
env['out'].print_line("") output.print_line("")
env['out'].print_line(" " + fingerprint, monospace=True) output.print_line(" " + fingerprint, monospace=True)
else: else:
env['out'].print_warning("""The SSL certificate for this domain is currently self-signed. Visitors to a website on output.print_warning("""The SSL certificate for this domain is currently self-signed. Visitors to a website on
this domain will get a security warning. If you are not serving a website on this domain, then it is this domain will get a security warning. If you are not serving a website on this domain, then it is
safe to leave the self-signed certificate in place. Use the SSL Certificates page in this control panel to safe to leave the self-signed certificate in place. Use the SSL Certificates page in this control panel to
install a signed SSL certificate.""") install a signed SSL certificate.""")
else: else:
env['out'].print_error("The SSL certificate has a problem: " + cert_status) output.print_error("The SSL certificate has a problem: " + cert_status)
if cert_status_details: if cert_status_details:
env['out'].print_line("") output.print_line("")
env['out'].print_line(cert_status_details) output.print_line(cert_status_details)
env['out'].print_line("") output.print_line("")
def check_certificate(domain, ssl_certificate, ssl_private_key): def check_certificate(domain, ssl_certificate, ssl_private_key):
# Use openssl verify to check the status of a certificate. # Use openssl verify to check the status of a certificate.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment