Commit 242cadeb authored by Joshua Tauberer's avatar Joshua Tauberer

allow dashes in emails during validation, and for aliases allow a much wider...

allow dashes in emails during validation, and for aliases allow a much wider range of characters, fixes #64

* for local mail users, also disallows periods at the beginning or end of the local or domain parts
* Dovecot gets confused if the string contains any unusual characters, so local mail users are restricted to a narrow regex
* for mail aliases Postfix is not confused so use a regex based on RFC 2822
parent f1dac1fe
import subprocess, shutil, os, sqlite3, re import subprocess, shutil, os, sqlite3, re
def validate_email(email, strict):
# There are a lot of characters permitted in email addresses, but
# Dovecot's sqlite driver seems to get confused if there are any
# unusual characters in the address. Bah. Also note that since
# the mailbox path name is based on the email address, the address
# shouldn't be absurdly long and must not have a forward slash.
if len(email) > 255: return False
if strict:
# For Dovecot's benefit, only allow basic characters.
ATEXT = r'[\w\-]'
else:
# Based on RFC 2822 and https://github.com/SyrusAkbary/validate_email/blob/master/validate_email.py,
# these characters are permitted in email address.
ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]' # see 3.2.4
DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*' # see 3.2.4
ADDR_SPEC = '^%s@%s$' % (DOT_ATOM_TEXT, DOT_ATOM_TEXT) # see 3.4.1
return re.match(ADDR_SPEC, email)
def open_database(env, with_connection=False): def open_database(env, with_connection=False):
conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite") conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite")
if not with_connection: if not with_connection:
...@@ -23,7 +45,7 @@ def get_mail_domains(env): ...@@ -23,7 +45,7 @@ def get_mail_domains(env):
return set([get_domain(addr) for addr in get_mail_users(env)] + [get_domain(addr1) for addr1, addr2 in get_mail_aliases(env)]) return set([get_domain(addr) for addr in get_mail_users(env)] + [get_domain(addr1) for addr1, addr2 in get_mail_aliases(env)])
def add_mail_user(email, pw, env): def add_mail_user(email, pw, env):
if not re.match("\w[\w\.]+@\w[\w\.]+$", email): if not validate_email(email, True):
return ("Invalid email address.", 400) return ("Invalid email address.", 400)
# get the database # get the database
...@@ -45,7 +67,12 @@ def add_mail_user(email, pw, env): ...@@ -45,7 +67,12 @@ def add_mail_user(email, pw, env):
# Check if the mailboxes exist before creating them. When creating a user that had previously # Check if the mailboxes exist before creating them. When creating a user that had previously
# been deleted, the mailboxes will still exist because they are still on disk. # been deleted, the mailboxes will still exist because they are still on disk.
existing_mboxes = subprocess.check_output(["doveadm", "mailbox", "list", "-u", email, "-8"]).decode("utf8").split("\n") try:
existing_mboxes = subprocess.check_output(["doveadm", "mailbox", "list", "-u", email, "-8"], stderr=subprocess.STDOUT).decode("utf8").split("\n")
except subprocess.CalledProcessError as e:
c.execute("DELETE FROM users WHERE email=?", (email,))
conn.commit()
return ("Failed to initialize the user: " + e.output.decode("utf8"), 400)
if "INBOX" not in existing_mboxes: subprocess.check_call(["doveadm", "mailbox", "create", "-u", email, "-s", "INBOX"]) if "INBOX" not in existing_mboxes: subprocess.check_call(["doveadm", "mailbox", "create", "-u", email, "-s", "INBOX"])
if "Spam" not in existing_mboxes: subprocess.check_call(["doveadm", "mailbox", "create", "-u", email, "-s", "Spam"]) if "Spam" not in existing_mboxes: subprocess.check_call(["doveadm", "mailbox", "create", "-u", email, "-s", "Spam"])
...@@ -88,6 +115,9 @@ def remove_mail_user(email, env): ...@@ -88,6 +115,9 @@ def remove_mail_user(email, env):
return do_dns_update(env) return do_dns_update(env)
def add_mail_alias(source, destination, env): def add_mail_alias(source, destination, env):
if not validate_email(source, False):
return ("Invalid email address.", 400)
conn, c = open_database(env, with_connection=True) conn, c = open_database(env, with_connection=True)
try: try:
c.execute("INSERT INTO aliases (source, destination) VALUES (?, ?)", (source, destination)) c.execute("INSERT INTO aliases (source, destination) VALUES (?, ?)", (source, destination))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment