Commit 787beab6 authored by Joshua Tauberer's avatar Joshua Tauberer

choose the best SSL cert from among the installed certificates; use the server...

choose the best SSL cert from among the installed certificates; use the server certificate instead of self-signed certificates

For HTTPS for the non-primary domains, instead of selecting an SSL certificate by expecting it to be in a directory named after the domain name (with special-case lookups
for www domains, and reusing the server certificate where possible), now scan all of the certificates that have been installed and just pick the best to use for each domain.

If no certificate is available, don't create a self-signed certificate anymore. This wasn't ever really necessary. Instead just use the server certificate.
parent 58349a94
...@@ -318,9 +318,9 @@ def dns_get_dump(): ...@@ -318,9 +318,9 @@ def dns_get_dump():
@app.route('/ssl/csr/<domain>', methods=['POST']) @app.route('/ssl/csr/<domain>', methods=['POST'])
@authorized_personnel_only @authorized_personnel_only
def ssl_get_csr(domain): def ssl_get_csr(domain):
from web_update import get_domain_ssl_files, create_csr from web_update import create_csr
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env) ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
return create_csr(domain, ssl_key, env) return create_csr(domain, ssl_private_key, env)
@app.route('/ssl/install', methods=['POST']) @app.route('/ssl/install', methods=['POST'])
@authorized_personnel_only @authorized_personnel_only
......
...@@ -13,7 +13,7 @@ import dateutil.parser, dateutil.tz ...@@ -13,7 +13,7 @@ import dateutil.parser, dateutil.tz
import idna import idna
from dns_update import get_dns_zones, build_tlsa_record, get_custom_dns_config, get_secondary_dns from dns_update import get_dns_zones, build_tlsa_record, get_custom_dns_config, get_secondary_dns
from web_update import get_web_domains, get_default_www_redirects, get_domain_ssl_files, get_domains_with_a_records from web_update import get_web_domains, get_default_www_redirects, get_ssl_certificates, get_domain_ssl_files, get_domains_with_a_records
from mailconfig import get_mail_domains, get_mail_aliases from mailconfig import get_mail_domains, get_mail_aliases
from utils import shell, sort_domains, load_env_vars_from_file, load_settings from utils import shell, sort_domains, load_env_vars_from_file, load_settings
...@@ -248,19 +248,21 @@ def run_domain_checks(rounded_time, env, output, pool): ...@@ -248,19 +248,21 @@ def run_domain_checks(rounded_time, env, output, pool):
# Get the list of domains that we don't serve web for because of a custom CNAME/A record. # Get the list of domains that we don't serve web for because of a custom CNAME/A record.
domains_with_a_records = get_domains_with_a_records(env) domains_with_a_records = get_domains_with_a_records(env)
ssl_certificates = get_ssl_certificates(env)
# Serial version: # Serial version:
#for domain in sort_domains(domains_to_check, env): #for domain in sort_domains(domains_to_check, env):
# run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains) # run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains)
# Parallelize the checks across a worker pool. # Parallelize the checks across a worker pool.
args = ((domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains, domains_with_a_records) args = ((domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains, domains_with_a_records, ssl_certificates)
for domain in domains_to_check) for domain in domains_to_check)
ret = pool.starmap(run_domain_checks_on_domain, args, chunksize=1) ret = pool.starmap(run_domain_checks_on_domain, args, chunksize=1)
ret = dict(ret) # (domain, output) => { domain: output } ret = dict(ret) # (domain, output) => { domain: output }
for domain in sort_domains(ret, env): for domain in sort_domains(ret, env):
ret[domain].playback(output) ret[domain].playback(output)
def run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains, domains_with_a_records): def run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zonefiles, mail_domains, web_domains, domains_with_a_records, ssl_certificates):
output = BufferedOutput() output = BufferedOutput()
# The domain is IDNA-encoded in the database, but for display use Unicode. # The domain is IDNA-encoded in the database, but for display use Unicode.
...@@ -282,7 +284,7 @@ def run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zone ...@@ -282,7 +284,7 @@ def run_domain_checks_on_domain(domain, rounded_time, env, dns_domains, dns_zone
check_mail_domain(domain, env, output) check_mail_domain(domain, env, output)
if domain in web_domains: if domain in web_domains:
check_web_domain(domain, rounded_time, env, output) check_web_domain(domain, rounded_time, ssl_certificates, env, output)
if domain in dns_domains: if domain in dns_domains:
check_dns_zone_suggestions(domain, env, output, dns_zonefiles, domains_with_a_records) check_dns_zone_suggestions(domain, env, output, dns_zonefiles, domains_with_a_records)
...@@ -528,7 +530,7 @@ def check_mail_domain(domain, env, output): ...@@ -528,7 +530,7 @@ def check_mail_domain(domain, env, output):
which may prevent recipients from receiving your mail. which may prevent recipients from receiving your mail.
See http://www.spamhaus.org/dbl/ and http://www.spamhaus.org/query/domain/%s.""" % (dbl, domain)) See http://www.spamhaus.org/dbl/ and http://www.spamhaus.org/query/domain/%s.""" % (dbl, domain))
def check_web_domain(domain, rounded_time, env, output): def check_web_domain(domain, rounded_time, ssl_certificates, env, output):
# See if the domain's A record resolves to our PUBLIC_IP. This is already checked # See if the domain's A record resolves to our PUBLIC_IP. This is already checked
# for PRIMARY_HOSTNAME, for which it is required for mail specifically. For it and # for PRIMARY_HOSTNAME, for which it is required for mail specifically. For it and
# other domains, it is required to access its website. # other domains, it is required to access its website.
...@@ -544,7 +546,7 @@ def check_web_domain(domain, rounded_time, env, output): ...@@ -544,7 +546,7 @@ def check_web_domain(domain, rounded_time, env, output):
# We need a SSL certificate for PRIMARY_HOSTNAME because that's where the # We need a SSL certificate for PRIMARY_HOSTNAME because that's where the
# user will log in with IMAP or webmail. Any other domain we serve a # user will log in with IMAP or webmail. Any other domain we serve a
# website for also needs a signed certificate. # website for also needs a signed certificate.
check_ssl_cert(domain, rounded_time, env, output) check_ssl_cert(domain, rounded_time, ssl_certificates, env, output)
def query_dns(qname, rtype, nxdomain='[Not Set]'): def query_dns(qname, rtype, nxdomain='[Not Set]'):
# Make the qname absolute by appending a period. Without this, dns.resolver.query # Make the qname absolute by appending a period. Without this, dns.resolver.query
...@@ -571,19 +573,24 @@ def query_dns(qname, rtype, nxdomain='[Not Set]'): ...@@ -571,19 +573,24 @@ def query_dns(qname, rtype, nxdomain='[Not Set]'):
# can compare to a well known order. # can compare to a well known order.
return "; ".join(sorted(str(r).rstrip('.') for r in response)) return "; ".join(sorted(str(r).rstrip('.') for r in response))
def check_ssl_cert(domain, rounded_time, env, output): def check_ssl_cert(domain, rounded_time, ssl_certificates, env, output):
# Check that SSL certificate is signed. # Check that SSL certificate is signed.
# Skip the check if the A record is not pointed here. # Skip the check if the A record is not pointed here.
if query_dns(domain, "A", None) not in (env['PUBLIC_IP'], None): return if query_dns(domain, "A", None) not in (env['PUBLIC_IP'], None): return
# Where is the SSL stored? # Where is the SSL stored?
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env) x = get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=True)
if not os.path.exists(ssl_certificate): if x is None:
output.print_error("The SSL certificate file for this domain is missing.") output.print_warning("""No SSL certificate is installed for this domain. Visitors to a website on
this domain will get a security warning. If you are not serving a website on this domain, you do
not need to take any action. Use the SSL Certificates page in the control panel to install a
SSL certificate.""")
return return
ssl_key, ssl_certificate, ssl_via = x
# Check that the certificate is good. # Check that the certificate is good.
cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key, rounded_time=rounded_time) cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key, rounded_time=rounded_time)
...@@ -607,16 +614,13 @@ def check_ssl_cert(domain, rounded_time, env, output): ...@@ -607,16 +614,13 @@ def check_ssl_cert(domain, rounded_time, env, output):
if domain == env['PRIMARY_HOSTNAME']: if domain == env['PRIMARY_HOSTNAME']:
output.print_error("""The SSL certificate for this domain is currently self-signed. You will get a security output.print_error("""The SSL certificate for this domain is currently self-signed. You will get a security
warning when you check or send email and when visiting this domain in a web browser (for webmail or warning when you check or send email and when visiting this domain in a web browser (for webmail or
static site hosting). Use the SSL Certificates page in this control panel to install a signed SSL certificate. static site hosting). Use the SSL Certificates page in the control panel to install a signed SSL certificate.
You may choose to leave the self-signed certificate in place and confirm the security exception, but check that You may choose to leave the self-signed certificate in place and confirm the security exception, but check that
the certificate fingerprint matches the following:""") the certificate fingerprint matches the following:""")
output.print_line("") output.print_line("")
output.print_line(" " + fingerprint, monospace=True) output.print_line(" " + fingerprint, monospace=True)
else: else:
output.print_warning("""The SSL certificate for this domain is currently self-signed. Visitors to a website on output.print_error("""The SSL certificate for this domain is self-signed.""")
this domain will get a security warning. If you are not serving a website on this domain, then it is
safe to leave the self-signed certificate in place. Use the SSL Certificates page in this control panel to
install a signed SSL certificate.""")
else: else:
output.print_error("The SSL certificate has a problem: " + cert_status) output.print_error("The SSL certificate has a problem: " + cert_status)
...@@ -630,8 +634,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring ...@@ -630,8 +634,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
# for the provided domain. # for the provided domain.
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509 import Certificate, DNSName, ExtensionNotFound, OID_COMMON_NAME, OID_SUBJECT_ALTERNATIVE_NAME from cryptography.x509 import Certificate
import idna
# The ssl_certificate file may contain a chain of certificates. We'll # The ssl_certificate file may contain a chain of certificates. We'll
# need to split that up before we can pass anything to openssl or # need to split that up before we can pass anything to openssl or
...@@ -646,33 +649,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring ...@@ -646,33 +649,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
# First check that the domain name is one of the names allowed by # First check that the domain name is one of the names allowed by
# the certificate. # the certificate.
if domain is not None: if domain is not None:
# The domain may be found in the Subject Common Name (CN). This comes back as an IDNA (ASCII) certificate_names, cert_primary_name = get_certificate_domains(cert)
# string, which is the format we store domains in - so good.
certificate_names = set()
try:
certificate_names.add(
cert.subject.get_attributes_for_oid(OID_COMMON_NAME)[0].value
)
except IndexError:
# No common name? Certificate is probably generated incorrectly.
# But we'll let it error-out when it doesn't find the domain.
pass
# ... or be one of the Subject Alternative Names. The cryptography library handily IDNA-decodes
# the names for us. We must encode back to ASCII, but wildcard certificates can't pass through
# IDNA encoding/decoding so we must special-case. See https://github.com/pyca/cryptography/pull/2071.
def idna_decode_dns_name(dns_name):
if dns_name.startswith("*."):
return "*." + idna.encode(dns_name[2:]).decode('ascii')
else:
return idna.encode(dns_name).decode('ascii')
try:
sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName)
for san in sans:
certificate_names.add(idna_decode_dns_name(san))
except ExtensionNotFound:
pass
# Check that the domain appears among the acceptable names, or a wildcard # Check that the domain appears among the acceptable names, or a wildcard
# form of the domain name (which is a stricter check than the specs but # form of the domain name (which is a stricter check than the specs but
...@@ -792,6 +769,41 @@ def load_pem(pem): ...@@ -792,6 +769,41 @@ def load_pem(pem):
return load_pem_x509_certificate(pem, default_backend()) return load_pem_x509_certificate(pem, default_backend())
raise ValueError("Unsupported PEM object type: " + pem_type.decode("ascii", "replace")) raise ValueError("Unsupported PEM object type: " + pem_type.decode("ascii", "replace"))
def get_certificate_domains(cert):
from cryptography.x509 import DNSName, ExtensionNotFound, OID_COMMON_NAME, OID_SUBJECT_ALTERNATIVE_NAME
import idna
names = set()
cn = None
# The domain may be found in the Subject Common Name (CN). This comes back as an IDNA (ASCII)
# string, which is the format we store domains in - so good.
try:
cn = cert.subject.get_attributes_for_oid(OID_COMMON_NAME)[0].value
names.add(cn)
except IndexError:
# No common name? Certificate is probably generated incorrectly.
# But we'll let it error-out when it doesn't find the domain.
pass
# ... or be one of the Subject Alternative Names. The cryptography library handily IDNA-decodes
# the names for us. We must encode back to ASCII, but wildcard certificates can't pass through
# IDNA encoding/decoding so we must special-case. See https://github.com/pyca/cryptography/pull/2071.
def idna_decode_dns_name(dns_name):
if dns_name.startswith("*."):
return "*." + idna.encode(dns_name[2:]).decode('ascii')
else:
return idna.encode(dns_name).decode('ascii')
try:
sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName)
for san in sans:
names.add(idna_decode_dns_name(san))
except ExtensionNotFound:
pass
return names, cn
_apt_updates = None _apt_updates = None
def list_apt_updates(apt_update=True): def list_apt_updates(apt_update=True):
# See if we have this information cached recently. # See if we have this information cached recently.
...@@ -1027,7 +1039,8 @@ if __name__ == "__main__": ...@@ -1027,7 +1039,8 @@ if __name__ == "__main__":
domain = env['PRIMARY_HOSTNAME'] domain = env['PRIMARY_HOSTNAME']
if query_dns(domain, "A") != env['PUBLIC_IP']: if query_dns(domain, "A") != env['PUBLIC_IP']:
sys.exit(1) sys.exit(1)
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env) ssl_certificates = get_ssl_certificates(env)
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, ssl_certificates, env)
if not os.path.exists(ssl_certificate): if not os.path.exists(ssl_certificate):
sys.exit(1) sys.exit(1)
cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key, warn_if_expiring_soon=False) cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key, warn_if_expiring_soon=False)
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
</tbody> </tbody>
</table> </table>
<p>Advanced:<br>Install a multi-domain or wildcard certificate for the <code>{{hostname}}</code> domain to have it automatically applied to any domains it is valid for.</p> <p>A multi-domain or wildcard certificate will be automatically applied to any domains it is valid for.</p>
<h3 id="ssl_install_header">Install SSL Certificate</h3> <h3 id="ssl_install_header">Install SSL Certificate</h3>
......
...@@ -60,6 +60,9 @@ def get_default_www_redirects(env): ...@@ -60,6 +60,9 @@ def get_default_www_redirects(env):
return sort_domains(www_domains - web_domains - get_domains_with_a_records(env), env) return sort_domains(www_domains - web_domains - get_domains_with_a_records(env), env)
def do_web_update(env): def do_web_update(env):
# Pre-load what SSL certificates we will use for each domain.
ssl_certificates = get_ssl_certificates(env)
# Build an nginx configuration file. # Build an nginx configuration file.
nginx_conf = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-top.conf")).read() nginx_conf = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-top.conf")).read()
...@@ -70,20 +73,20 @@ def do_web_update(env): ...@@ -70,20 +73,20 @@ def do_web_update(env):
template3 = "\trewrite ^(.*) https://$REDIRECT_DOMAIN$1 permanent;\n" template3 = "\trewrite ^(.*) https://$REDIRECT_DOMAIN$1 permanent;\n"
# Add the PRIMARY_HOST configuration first so it becomes nginx's default server. # Add the PRIMARY_HOST configuration first so it becomes nginx's default server.
nginx_conf += make_domain_config(env['PRIMARY_HOSTNAME'], [template0, template1, template2], env) nginx_conf += make_domain_config(env['PRIMARY_HOSTNAME'], [template0, template1, template2], ssl_certificates, env)
# Add configuration all other web domains. # Add configuration all other web domains.
has_root_proxy_or_redirect = get_web_domains_with_root_overrides(env) has_root_proxy_or_redirect = get_web_domains_with_root_overrides(env)
for domain in get_web_domains(env): for domain in get_web_domains(env):
if domain == env['PRIMARY_HOSTNAME']: continue # handled above if domain == env['PRIMARY_HOSTNAME']: continue # handled above
if domain not in has_root_proxy_or_redirect: if domain not in has_root_proxy_or_redirect:
nginx_conf += make_domain_config(domain, [template0, template1], env) nginx_conf += make_domain_config(domain, [template0, template1], ssl_certificates, env)
else: else:
nginx_conf += make_domain_config(domain, [template0], env) nginx_conf += make_domain_config(domain, [template0], ssl_certificates, env)
# Add default www redirects. # Add default www redirects.
for domain in get_default_www_redirects(env): for domain in get_default_www_redirects(env):
nginx_conf += make_domain_config(domain, [template0, template3], env) nginx_conf += make_domain_config(domain, [template0, template3], ssl_certificates, env)
# Did the file change? If not, don't bother writing & restarting nginx. # Did the file change? If not, don't bother writing & restarting nginx.
nginx_conf_fn = "/etc/nginx/conf.d/local.conf" nginx_conf_fn = "/etc/nginx/conf.d/local.conf"
...@@ -104,18 +107,14 @@ def do_web_update(env): ...@@ -104,18 +107,14 @@ def do_web_update(env):
return "web updated\n" return "web updated\n"
def make_domain_config(domain, templates, env): def make_domain_config(domain, templates, ssl_certificates, env):
# GET SOME VARIABLES # GET SOME VARIABLES
# Where will its root directory be for static files? # Where will its root directory be for static files?
root = get_web_root(domain, env) root = get_web_root(domain, env)
# What private key and SSL certificate will we use for this domain? # What private key and SSL certificate will we use for this domain?
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env) ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, ssl_certificates, env)
# For hostnames created after the initial setup, ensure we have an SSL certificate
# available. Make a self-signed one now if one doesn't exist.
ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, env)
# ADDITIONAL DIRECTIVES. # ADDITIONAL DIRECTIVES.
...@@ -186,77 +185,140 @@ def get_web_root(domain, env, test_exists=True): ...@@ -186,77 +185,140 @@ def get_web_root(domain, env, test_exists=True):
if os.path.exists(root) or not test_exists: break if os.path.exists(root) or not test_exists: break
return root return root
def get_domain_ssl_files(domain, env, allow_shared_cert=True): def get_ssl_certificates(env):
# What SSL private key will we use? Allow the user to override this, but # Scan all of the installed SSL certificates and map every domain
# in many cases using the same private key for all domains would be fine. # that the certificates are good for to the best certificate for
# Don't allow the user to override the key for PRIMARY_HOSTNAME because # the domain.
# that's what's in the main file.
ssl_key = os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_private_key.pem') from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
ssl_key_is_alt = False from cryptography.x509 import Certificate
alt_key = os.path.join(env["STORAGE_ROOT"], 'ssl/%s/private_key.pem' % safe_domain_name(domain))
if domain != env['PRIMARY_HOSTNAME'] and os.path.exists(alt_key): # The certificates are all stored here:
ssl_key = alt_key ssl_root = os.path.join(env["STORAGE_ROOT"], 'ssl')
ssl_key_is_alt = True
# List all of the files in the SSL directory and one level deep.
# What SSL certificate will we use? def get_file_list():
ssl_certificate_primary = os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem') for fn in os.listdir(ssl_root):
ssl_via = None fn = os.path.join(ssl_root, fn)
if domain == env['PRIMARY_HOSTNAME']: if os.path.isfile(fn):
# For PRIMARY_HOSTNAME, use the one we generated at set-up time. yield fn
ssl_certificate = ssl_certificate_primary elif os.path.isdir(fn):
else: for fn1 in os.listdir(fn):
# For other domains, we'll probably use a certificate in a different path. fn1 = os.path.join(fn, fn1)
ssl_certificate = os.path.join(env["STORAGE_ROOT"], 'ssl/%s/ssl_certificate.pem' % safe_domain_name(domain)) if os.path.isfile(fn1):
yield fn1
# But we can be smart and reuse the main SSL certificate if is has
# a Subject Alternative Name matching this domain. Don't do this if # Remember stuff.
# the user has uploaded a different private key for this domain. private_keys = { }
if not ssl_key_is_alt and allow_shared_cert: certificates = [ ]
from status_checks import check_certificate
if check_certificate(domain, ssl_certificate_primary, None, just_check_domain=True)[0] == "OK": # Scan each of the files to find private keys and certificates.
ssl_certificate = ssl_certificate_primary # We must load all of the private keys first before processing
ssl_via = "Using multi/wildcard certificate of %s." % env['PRIMARY_HOSTNAME'] # certificates so that we can check that we have a private key
# available before using a certificate.
# For a 'www.' domain, see if we can reuse the cert of the parent. from status_checks import load_cert_chain, load_pem
elif domain.startswith('www.'): for fn in get_file_list():
ssl_certificate_parent = os.path.join(env["STORAGE_ROOT"], 'ssl/%s/ssl_certificate.pem' % safe_domain_name(domain[4:])) try:
if os.path.exists(ssl_certificate_parent) and check_certificate(domain, ssl_certificate_parent, None, just_check_domain=True)[0] == "OK": pem = load_pem(load_cert_chain(fn)[0])
ssl_certificate = ssl_certificate_parent except ValueError:
ssl_via = "Using multi/wildcard certificate of %s." % domain[4:] # Not a valid PEM format for a PEM type we care about.
continue
return ssl_key, ssl_certificate, ssl_via
# Remember where we got this object.
def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, env): pem._filename = fn
# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if
# a certificate doesn't already exist. See setup/mail.sh for documentation. # Is it a private key?
if isinstance(pem, RSAPrivateKey):
private_keys[pem.public_key().public_numbers()] = pem
# Is it a certificate?
if isinstance(pem, Certificate):
certificates.append(pem)
# Process the certificates.
domains = { }
from status_checks import get_certificate_domains
for cert in certificates:
# What domains is this certificate good for?
cert_domains, primary_domain = get_certificate_domains(cert)
cert._primary_domain = primary_domain
# Is there a private key file for this certificate?
private_key = private_keys.get(cert.public_key().public_numbers())
if not private_key:
continue
cert._private_key = private_key
# Add this cert to the list of certs usable for the domains.
for domain in cert_domains:
domains.setdefault(domain, []).append(cert)
# Sort the certificates to prefer good ones.
import datetime
now = datetime.datetime.utcnow()
ret = { }
for domain, cert_list in domains.items():
cert_list.sort(key = lambda cert : (
# must be valid NOW
cert.not_valid_before <= now <= cert.not_valid_after,
# prefer one that is not self-signed
cert.issuer != cert.subject,
# prefer one with the expiration furthest into the future so
# that we can easily rotate to new certs as we get them
cert.not_valid_after,
# in case a certificate is installed in multiple paths,
# prefer the... lexicographically last one?
cert._filename,
), reverse=True)
cert = cert_list.pop(0)
ret[domain] = {
"private-key": cert._private_key._filename,
"certificate": cert._filename,
"primary-domain": cert._primary_domain,
}
return ret
def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False):
# Get the default paths.
ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
if domain == env['PRIMARY_HOSTNAME']: if domain == env['PRIMARY_HOSTNAME']:
return # The primary domain must use the server certificate because
# it is hard-coded in some service configuration files.
# Sanity check. Shouldn't happen. A non-primary domain might use this return ssl_private_key, ssl_certificate, None
# certificate (see above), but then the certificate should exist anyway.
if ssl_certificate == os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem'): wildcard_domain = re.sub("^[^\.]+", "*", domain)
return
if domain in ssl_certificates:
if os.path.exists(ssl_certificate): cert_info = ssl_certificates[domain]
return cert_type = "multi-domain"
elif wildcard_domain in ssl_certificates:
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True) cert_info = ssl_certificates[wildcard_domain]
cert_type = "wildcard"
# Generate a new self-signed certificate using the same private key that we already have. elif not allow_missing_cert:
# No certificate is available for this domain! Return default files.
# Start with a CSR written to a temporary file. ssl_via = "Using certificate for %s." % env['PRIMARY_HOSTNAME']
with tempfile.NamedTemporaryFile(mode="w") as csr_fp: return ssl_private_key, ssl_certificate, ssl_via
csr_fp.write(create_csr(domain, ssl_key, env)) else:
csr_fp.flush() # since we won't close until after running 'openssl x509', since close triggers delete. # No certificate is available - and warn appropriately.
return None
# 'via' is a hint to the user about which certificate is in use for the domain
if cert_info['certificate'] == os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'):
# Using the server certificate.
via = "Using same %s certificate as for %s." % (cert_type, env['PRIMARY_HOSTNAME'])
elif cert_info['primary-domain'] != domain and cert_info['primary-domain'] in ssl_certificates and cert_info == ssl_certificates[cert_info['primary-domain']]:
via = "Using same %s certificate as for %s." % (cert_type, cert_info['primary-domain'])
else:
via = None # don't show a hint - show expiration info instead
# And then make the certificate. return cert_info['private-key'], cert_info['certificate'], via
shell("check_call", [
"openssl", "x509", "-req",
"-days", "365",
"-in", csr_fp.name,
"-signkey", ssl_key,
"-out", ssl_certificate])
def create_csr(domain, ssl_key, env): def create_csr(domain, ssl_key, env):
return shell("check_output", [ return shell("check_output", [
...@@ -278,8 +340,8 @@ def install_cert(domain, ssl_cert, ssl_chain, env): ...@@ -278,8 +340,8 @@ def install_cert(domain, ssl_cert, ssl_chain, env):
# Do validation on the certificate before installing it. # Do validation on the certificate before installing it.
from status_checks import check_certificate from status_checks import check_certificate
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env, allow_shared_cert=False) ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
cert_status, cert_status_details = check_certificate(domain, fn, ssl_key) cert_status, cert_status_details = check_certificate(domain, fn, ssl_private_key)
if cert_status != "OK": if cert_status != "OK":
if cert_status == "SELF-SIGNED": if cert_status == "SELF-SIGNED":
cert_status = "This is a self-signed certificate. I can't install that." cert_status = "This is a self-signed certificate. I can't install that."
...@@ -288,7 +350,24 @@ def install_cert(domain, ssl_cert, ssl_chain, env): ...@@ -288,7 +350,24 @@ def install_cert(domain, ssl_cert, ssl_chain, env):
cert_status += " " + cert_status_details cert_status += " " + cert_status_details
return cert_status return cert_status
# Copy the certificate to its expected location. # Where to put it?
if domain == env['PRIMARY_HOSTNAME']:
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
else:
# Make a unique path for the certificate.
from status_checks import load_cert_chain, load_pem, get_certificate_domains
from cryptography.hazmat.primitives import hashes
from binascii import hexlify
cert = load_pem(load_cert_chain(fn)[0])
all_domains, cn = get_certificate_domains(cert)
path = "%s-%s-%s" % (
cn, # common name
cert.not_valid_after.date().isoformat().replace("-", ""), # expiration date
hexlify(cert.fingerprint(hashes.SHA256())).decode("ascii")[0:8], # fingerprint prefix
)
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', path, 'ssl_certificate.pem'))
# Install the certificate.
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True) os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
shutil.move(fn, ssl_certificate) shutil.move(fn, ssl_certificate)
...@@ -314,9 +393,10 @@ def get_web_domains_info(env): ...@@ -314,9 +393,10 @@ def get_web_domains_info(env):
# for the SSL config panel, get cert status # for the SSL config panel, get cert status
def check_cert(domain): def check_cert(domain):
from status_checks import check_certificate from status_checks import check_certificate
ssl_key, ssl_certificate, ssl_via = get_domain_ssl_files(domain, env) ssl_certificates = get_ssl_certificates(env)
if not os.path.exists(ssl_certificate): x = get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=True)
return ("danger", "No Certificate Installed") if x is None: return ("danger", "No Certificate Installed")
ssl_key, ssl_certificate, ssl_via = x
cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key) cert_status, cert_status_details = check_certificate(domain, ssl_certificate, ssl_key)
if cert_status == "OK": if cert_status == "OK":
if not ssl_via: if not ssl_via:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment