cloudflare.py 6.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
try:
	import urllib.parse as urlparse
except:
	import urlparse

import requests

from . import common


class CloudFlareDns(common.BaseDns):
    """
    """

    dns_provider_name = "cloudflare"

    def __init__(
        self,
        CLOUDFLARE_EMAIL,
        CLOUDFLARE_API_KEY,
        CLOUDFLARE_API_BASE_URL="https://api.cloudflare.com/client/v4/",
    ):
        self.CLOUDFLARE_DNS_ZONE_ID = None
        self.CLOUDFLARE_EMAIL = CLOUDFLARE_EMAIL
        self.CLOUDFLARE_API_KEY = CLOUDFLARE_API_KEY
        self.CLOUDFLARE_API_BASE_URL = CLOUDFLARE_API_BASE_URL
        self.HTTP_TIMEOUT = 65  # seconds

        if CLOUDFLARE_API_BASE_URL[-1] != "/":
            self.CLOUDFLARE_API_BASE_URL = CLOUDFLARE_API_BASE_URL + "/"
        else:
            self.CLOUDFLARE_API_BASE_URL = CLOUDFLARE_API_BASE_URL
        super(CloudFlareDns, self).__init__()

    def find_dns_zone(self, domain_name):
        self.logger.debug("find_dns_zone")
        url = urlparse.urljoin(self.CLOUDFLARE_API_BASE_URL, "zones?status=active")
        headers = {"X-Auth-Email": self.CLOUDFLARE_EMAIL, "X-Auth-Key": self.CLOUDFLARE_API_KEY}
        find_dns_zone_response = requests.get(url, headers=headers, timeout=self.HTTP_TIMEOUT)
        self.logger.debug(
            "find_dns_zone_response. status_code={0}".format(find_dns_zone_response.status_code)
        )
        if find_dns_zone_response.status_code != 200:
            raise ValueError(
                "Error creating cloudflare dns record: status_code={status_code} response={response}".format(
                    status_code=find_dns_zone_response.status_code,
                    response=self.log_response(find_dns_zone_response),
                )
            )

        result = find_dns_zone_response.json()["result"]
        for i in result:
            if i["name"] in domain_name:
                setattr(self, "CLOUDFLARE_DNS_ZONE_ID", i["id"])
        if isinstance(self.CLOUDFLARE_DNS_ZONE_ID, type(None)):
            raise ValueError(
                "Error unable to get DNS zone for domain_name={domain_name}: status_code={status_code} response={response}".format(
                    domain_name=domain_name,
                    status_code=find_dns_zone_response.status_code,
                    response=self.log_response(find_dns_zone_response),
                )
            )

        self.logger.debug("find_dns_zone_success")

    def create_dns_record(self, domain_name, domain_dns_value):
        self.logger.info("create_dns_record")
        # if we have been given a wildcard name, strip wildcard
        domain_name = domain_name.lstrip("*.")
        self.find_dns_zone(domain_name)

        url = urllib.parse.urljoin(
            self.CLOUDFLARE_API_BASE_URL,
            "zones/{0}/dns_records".format(self.CLOUDFLARE_DNS_ZONE_ID),
        )
        headers = {"X-Auth-Email": self.CLOUDFLARE_EMAIL, "X-Auth-Key": self.CLOUDFLARE_API_KEY}
        body = {
            "type": "TXT",
            "name": "_acme-challenge" + "." + domain_name + ".",
            "content": "{0}".format(domain_dns_value),
        }
        create_cloudflare_dns_record_response = requests.post(
            url, headers=headers, json=body, timeout=self.HTTP_TIMEOUT
        )
        self.logger.debug(
            "create_cloudflare_dns_record_response. status_code={0}. response={1}".format(
                create_cloudflare_dns_record_response.status_code,
                self.log_response(create_cloudflare_dns_record_response),
            )
        )
        if create_cloudflare_dns_record_response.status_code != 200:
            # raise error so that we do not continue to make calls to ACME
            # server
            raise ValueError(
                "Error creating cloudflare dns record: status_code={status_code} response={response}".format(
                    status_code=create_cloudflare_dns_record_response.status_code,
                    response=self.log_response(create_cloudflare_dns_record_response),
                )
            )
        self.logger.info("create_dns_record_end")

    def delete_dns_record(self, domain_name, domain_dns_value):
        self.logger.info("delete_dns_record")

        class MockResponse(object):
            def __init__(self, status_code=200, content="mock-response"):
                self.status_code = status_code
                self.content = content
                super(MockResponse, self).__init__()

            def json(self):
                return {}

        delete_dns_record_response = MockResponse()
        headers = {"X-Auth-Email": self.CLOUDFLARE_EMAIL, "X-Auth-Key": self.CLOUDFLARE_API_KEY}

        dns_name = "_acme-challenge" + "." + domain_name
        list_dns_payload = {"type": "TXT", "name": dns_name}
        list_dns_url = urllib.parse.urljoin(
            self.CLOUDFLARE_API_BASE_URL,
            "zones/{0}/dns_records".format(self.CLOUDFLARE_DNS_ZONE_ID),
        )

        list_dns_response = requests.get(
            list_dns_url, params=list_dns_payload, headers=headers, timeout=self.HTTP_TIMEOUT
        )

        for i in range(0, len(list_dns_response.json()["result"])):
            dns_record_id = list_dns_response.json()["result"][i]["id"]
            url = urllib.parse.urljoin(
                self.CLOUDFLARE_API_BASE_URL,
                "zones/{0}/dns_records/{1}".format(self.CLOUDFLARE_DNS_ZONE_ID, dns_record_id),
            )
            headers = {"X-Auth-Email": self.CLOUDFLARE_EMAIL, "X-Auth-Key": self.CLOUDFLARE_API_KEY}
            delete_dns_record_response = requests.delete(
                url, headers=headers, timeout=self.HTTP_TIMEOUT
            )
            self.logger.debug(
                "delete_dns_record_response. status_code={0}. response={1}".format(
                    delete_dns_record_response.status_code,
                    self.log_response(delete_dns_record_response),
                )
            )
            if delete_dns_record_response.status_code != 200:
                # extended logging for debugging
                # we do not need to raise exception
                self.logger.error(
                    "delete_dns_record_response. status_code={0}. response={1}".format(
                        delete_dns_record_response.status_code,
                        self.log_response(delete_dns_record_response),
                    )
                )

        self.logger.info("delete_dns_record_success")