Multi provider DNS management with Terraform and Pulumi

The Problem

Every DNS provider is very specific how they create DNS records. Using Terraform or Pulumi don’t guarantee multi provider support out of the box.

One example where AWS Route53 support values for multiple IP binding to the same name record. Where Cloudflare must have a dedicated record for each IP.

Theses API difference make it harder to write code which will work for multiple providers.

For AWS Route53 a single record can be created like this:

mydomain.local: IP1, IP2, IP3

For Cloudflare it would be 3 different records:

mydomain.local: IP1
mydomain.local: IP2
mydomain.local: IP3

The Solution 1: Use flexibility of programming language available with Pulumi

Pulumi has a first hand here since you can use the power of programming language to handle custom logic.

DNS data structure:

mydomain1.com: 
 - IP1
 - IP2 
 - IP3
mydomain2.com:
 - IP4
 - IP5 
 - IP6
mydomain3.com: 
 - IP7
 - IP8 
 - IP9

Using Python or Javascript we can expand this structure for Cloudflare provider or keep as is for AWS Route53.

In Cloudflare case we will create new record for each new IP

import pulumi
import pulumi_cloudflare as cloudflare
import yaml

# Load the configuration from a YAML file
yaml_file = "dns_records.yaml"
with open(yaml_file, "r") as file:
    dns_config = yaml.safe_load(file)

# Cloudflare Zone ID (Replace with your actual Cloudflare Zone ID)
zone_id = "your_cloudflare_zone_id"

# Iterate through domains and their associated IPs to create A records
for domain, ips in dns_config.items():
    if isinstance(ips, list):  # Ensure it's a list of IPs
        for ip in ips:
            record_name = domain
            cloudflare.Record(
                f"{record_name}-{ip.replace('.', '-')}",
                zone_id=zone_id,
                name=record_name,
                type="A",
                value=ip,
                ttl=3600,  # Set TTL (adjust as needed)
            )

# Export the created records
pulumi.export("dns_records", dns_config)

and since AWS Route53 support IPs list, so the code would look like:

for domain, ips in dns_config.items():
    if isinstance(ips, list) and ips:  # Ensure it's a list of IPs and not empty
        aws.route53.Record(
            f"{domain}-record",
            zone_id=hosted_zone_id,
            name=domain,
            type="A",
            ttl=300,  # Set TTL (adjust as needed)
            records=ips,  # AWS Route 53 supports multiple IPs in a single record
        )

Solution 2 – Using Terraform for each loop

It’s quite possible to achieve the same using Terraform starting with version 0.12 which introduce dynamic block.

Same data structure:

mydomain1.com: 
  - 192.168.1.1
  - 192.168.1.2
  - 192.168.1.3
mydomain2.com:
  - 10.0.0.1
  - 10.0.0.2
  - 10.0.0.3
mydomain3.com: 
  - 172.16.0.1
  - 172.16.0.2
  - 172.16.0.3

Terraform example for AWS Route53

provider "aws" {
  region = "us-east-1"  # Change this to your preferred region
}

variable "hosted_zone_id" {
  type = string
}

variable "dns_records" {
  type = map(list(string))
}

resource "aws_route53_record" "dns_records" {
  for_each = var.dns_records

  zone_id = var.hosted_zone_id
  name    = each.key
  type    = "A"
  ttl     = 300
  records = each.value
}

Quite simple using for_each loop, but will not work with Cloudflare, because of the mentioned compatibility issue. So, we need new record for each IP.

Terraform example for Cloudflare

# Create multiple records for each domain, one per IP
resource "cloudflare_record" "dns_records" {
  for_each = { for k, v in var.dns_records : k => flatten([for ip in v : { domain = k, ip = ip }]) }

  zone_id = var.cloudflare_zone_id
  name    = each.value.domain
  type    = "A"
  value   = each.value.ip
  ttl     = 3600
  proxied = false  # Set to true if using Cloudflare proxy
}

Conclusions

  1. Pulumi: Flexible and easy to start. Data is separate from code, making it easy to add providers or change logic.
  2. Terraform: Less complex and easier to support long-term but depends on data format
  3. Both solutions require programming skills or expertise in Terraform language.

Simple LRU cache implementation on Python 3

What is LRU Cache?

This is caching item replacement policy, so the least used items will be discarded first.

Problem

The LRU Cache algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item.

Solution

Approaching a problem I was thinking of two capabilities for data structures: a FIFO queue and Hash table.

FIFO queue will be responsible to evict least used items. Hash table will be responsible to get cached items. Using this data structures make both operations in O(1) time complexity.

Python collections.OrderedDict combine both of this capabilities:

  • Queue: dict items are ordered as FIFO queue, so inserts and evictions are done in O(1)
  • Hash table: dict keys provide access to data in O(1) time
import collections
LRUCache = collections.OrderedDict()

Insert operation:

LRUCache[key] = value

It will add a key to the end of the dict, so position of new items is always fixed.

Check for hit:

if key in LRUCache:
  LRUCache.move_to_end(key)

Here we are doing two things: 1) Checking if key exist 2)If key is exist we move they key to the end of dict, so the keys which got a hit always update it’s position to become like newest key.

Discard or evict operation:

if len(LRUCache) > CACHE_SIZE:
  evict_key, evict_val = LRUCache.popitem(last=False)

Evict operation pop item from the beginning of the dict, so removing the oldest key in the dict.

Python 3 implementation

"""Simple LRUCache implementation."""

import collections
import os
import random
from math import factorial

CACHE_SIZE = 100
if os.getenv('CACHE_SIZE'):
    CACHE_SIZE = int(os.getenv('CACHE_SIZE'))

SAMPLE_SIZE = 100
if os.getenv('SAMPLE_SIZE'):
    SAMPLE_SIZE = int(os.getenv('SAMPLE_SIZE'))

LRUCache = collections.OrderedDict()


def expensive_call(number):
    """Calculate factorial. Example of expensive call."""
    return factorial(number)


if __name__ == '__main__':

    test_cases = random.choices(
        [x for x in range(SAMPLE_SIZE*3)],
        [x for x in range(SAMPLE_SIZE*3)],
        k=SAMPLE_SIZE
    )

    for test in test_cases:
        if test in LRUCache:
            print("hit:", test, LRUCache[test])
            # Update position of the hit item to first. Optional.
            LRUCache.move_to_end(test, last=True)
        else:
            LRUCache[test] = expensive_call(test)
            print("miss:", test, LRUCache[test])
        if len(LRUCache) > CACHE_SIZE:
            evict_key, evict_val = LRUCache.popitem(last=False)
            print("evict:", evict_key, evict_val)

As a use case I have used LRU cache to cache the output of expensive function call like factorial.

Sample size and Cache size are controllable through environment variables. Try to run it on small numbers to see how it behave:

CACHE_SIZE=4 SAMPLE_SIZE=10 python lru.py

Next steps are

  • Encapsulate business logic into class
  • Add Python magic functions to provide Pythonic way of dealing with class objects
  • Add unit test

References