Solving Top-K problem – Leaderboard in Python

What is Top-K problem?

Think about Real-time Gaming Leaderboard with a rank system. Determining the rank of a player is a problem we are trying to solve.
This is implementation of Real-time Leaderboard solution from the System Design interview book.
Check out full source code at Leaderboard – solving Top K Problem with Python Github project.

Leader board requirements

  1. Get top 10 ranked players
  2. Show a current rank for each player
  3. Show a slice of players around specific rank

Solution 1: using SQL database

Direct approach is to use SQL kind of database and count rank in real time using count until Player score points >= all other players. This solution fall short as soon as we need to scale it for high number of players.

Solution 2: using Valkey/Redis sorted set

Valkey sorted set is a special engine type which keeps all records sorted according to a “score” which was made for exactly this kind of problem and it’s what we are going to use.

Implementation

Top-k-probelm(Leaderboard) project is built using FastAPI, SQLModel and Valkey.

API Design

API design was borrowed from the System Design Interview book. According to requirements it covers CRUD operations and getting rank operation.

1. Create Score
POST /scores
{
"name": "AwesomeGamer"
}

2. Update Score
PUT /scores
{
"id": 1,
"points": 10
}

3. Get TOP Scores
GET /scores

4. Get user rank
GET /scores/id

Players data model and API: SQLModel + FastAPI

I am using SQLite table to store user profile and current score of a player. Several SQLModel classes helps to customize API endpoints.

class ScoreBase(SQLModel):
name: str | None = Field(default=None)

class Score(ScoreBase, table=True):
id: int | None = Field(default=None, primary_key=True)
points: int = Field(default=0)
rank: int = Field(default=0)

class ScoreCreate(ScoreBase):
pass

class ScorePublic(ScoreBase):
id: int
points: int
name: str
rank: int

And APIs using FastAPI parameters and response model

@app.post("/scores", response_model=ScorePublic)
def create_score(data: ScoreCreate, session: SessionDep):

@app.put("/scores", response_model=ScorePublic)
def update_scores(id: int, points: int, session: SessionDep):

@app.get("/scores", response_model=list[ScorePublic])
def get_top_scores(session: SessionDep, 
                   offset: int = 0,
                   limit: Annotated[int, Query(le=10)] = 10):

@app.get("/scores/{id}", response_model=ScorePublic)
def get_score(id: int, session: SessionDep):

Both libraries provides very flexible and powerful way to API design.

Rank solution based on Valkey sorted set

From Valkey documentation

A Sorted Set is a collection of unique strings (members) ordered by an associated score. When more than one string has the same score, the strings are ordered lexicographically. Some use cases for sorted sets include:

  • Leaderboards. For example, you can use sorted sets to easily maintain ordered lists of the highest scores in a massive online game.
  • Rate limiters. In particular, you can use a sorted set to build a sliding-window rate limiter to prevent excessive API requests.

ZREVRANGE helps to get top 10 players sorted in reverse order by their score for our leader board.

ZREVRANGE leaderboard 0, 10

ZRANK helps to get specific player rank.

ZRANK leaderboard id

Finally, using ZRANK following by ZREVRANGE we can also provide a list of players who is near by to a specific player inside the rank.

ZRANGE leaderboard 77
# rank 255
ZREVRANGE leaderboard 255, 250
# 5 players before
ZREVRANGE leaderboard 260, 255
# 5 players after

Conclusions

  1. FastAPI and SQLModel provides great interfaces to design APIs with nice domain model integration
  2. Valkey soreted set is a built-in solution for Top-k problem
  3. It’s not Python native, but if you are looking for more leet code solution on Python then check out this awesome video

LLM-Powered Unit Testing: Effortlessly Generate Domain Model Data

Traditional approaches to generating model data for unit tests often involve setting up complex fixtures or writing cumbersome mock classes. This becomes especially challenging when your domain model is tightly coupled with your data mode.

class Order(Base):
# ...
product_name = Column(String, index=True)
quantity = Column(Integer)

In more advanced scenarios, your Domain model is decoupled from the Data model, allowing for cleaner architecture and improved maintainability.

class Order(BaseModel):
product_name: str
quantity: int

def testOrder(self):
order = Order(product_name="Milk", quantity=2)
self.assertEqual(order.quantity, 2)

Using hard-coded data can be a sub-optimal approach. Every modification to the Domain model requires updates to the test suite.

LLM-generated data comes to the rescue. By leveraging models like ChatGPT-4, you can dynamically generate test data that adheres to your Domain model’s schema.

In the following example, we’ll use ChatGPT-4 to generate data for a Electronics Domain model class, ensuring the structure and content align with the model’s expected schema.

# Imports are skipped to demo the core of the solution

class Electronics(BaseModel):
""" Domain model for Electronics """
model: str
price: float = Field(gt=0, description="Price must be greater than 0")

@llm.call(provider="openai", model="gpt-4o", response_model=List[Electronics])
def list_electronics(limit: int) -> List[Electronics]:
""" Generate N items of products defined as Electronics """
return f"Generate {limit} number of electronic products"

And the unittests:

class TestElectronics(unittest.TestCase):
""" Test class using LLM to generate domain data """

@classmethod
def setUpClass(cls):
""" Generating test data once """
cls._products = list_electronics(5)

def test_unique_models(self):
""" Test product models are unique """
models = [product.model for product in self._products]
self.assertEqual(len(models), len(set(models)))

Running the LLM can take a few seconds, so to optimize performance, I move the data generation process to the setUpClass method.

Big kudos to mirascope, llm, smartfunc and other libraries to make LLMs much more accessible to every day applications!

Feel free to try that yourself, all source code available in the Github project.

If you are interested in a command line LLM integration check out How to automate review with gptscript.

Conclusions:

  1. LLMs become much more accessible in day to day programming.
  2. Unittests data generation is just an example, but it demonstrates how easy it is to embed LLMs in real life scenarios
  3. It is still expensive and time consuming to call LLM for each unit tests invocation, so smarter approach is to generate and cache tests data

Multi provider DNS management with Terraform and Pulumi

The Problem

Every DNS provider is very specific how they create DNS records. Using Terraform or Pulumi don’t guarantee multi provider support out of the box.

One example where AWS Route53 support values for multiple IP binding to the same name record. Where Cloudflare must have a dedicated record for each IP.

Theses API difference make it harder to write code which will work for multiple providers.

For AWS Route53 a single record can be created like this:

mydomain.local: IP1, IP2, IP3

For Cloudflare it would be 3 different records:

mydomain.local: IP1
mydomain.local: IP2
mydomain.local: IP3

The Solution 1: Use flexibility of programming language available with Pulumi

Pulumi has a first hand here since you can use the power of programming language to handle custom logic.

DNS data structure:

mydomain1.com: 
 - IP1
 - IP2 
 - IP3
mydomain2.com:
 - IP4
 - IP5 
 - IP6
mydomain3.com: 
 - IP7
 - IP8 
 - IP9

Using Python or Javascript we can expand this structure for Cloudflare provider or keep as is for AWS Route53.

In Cloudflare case we will create new record for each new IP

import pulumi
import pulumi_cloudflare as cloudflare
import yaml

# Load the configuration from a YAML file
yaml_file = "dns_records.yaml"
with open(yaml_file, "r") as file:
    dns_config = yaml.safe_load(file)

# Cloudflare Zone ID (Replace with your actual Cloudflare Zone ID)
zone_id = "your_cloudflare_zone_id"

# Iterate through domains and their associated IPs to create A records
for domain, ips in dns_config.items():
    if isinstance(ips, list):  # Ensure it's a list of IPs
        for ip in ips:
            record_name = domain
            cloudflare.Record(
                f"{record_name}-{ip.replace('.', '-')}",
                zone_id=zone_id,
                name=record_name,
                type="A",
                value=ip,
                ttl=3600,  # Set TTL (adjust as needed)
            )

# Export the created records
pulumi.export("dns_records", dns_config)

and since AWS Route53 support IPs list, so the code would look like:

for domain, ips in dns_config.items():
    if isinstance(ips, list) and ips:  # Ensure it's a list of IPs and not empty
        aws.route53.Record(
            f"{domain}-record",
            zone_id=hosted_zone_id,
            name=domain,
            type="A",
            ttl=300,  # Set TTL (adjust as needed)
            records=ips,  # AWS Route 53 supports multiple IPs in a single record
        )

Solution 2 – Using Terraform for each loop

It’s quite possible to achieve the same using Terraform starting with version 0.12 which introduce dynamic block.

Same data structure:

mydomain1.com: 
  - 192.168.1.1
  - 192.168.1.2
  - 192.168.1.3
mydomain2.com:
  - 10.0.0.1
  - 10.0.0.2
  - 10.0.0.3
mydomain3.com: 
  - 172.16.0.1
  - 172.16.0.2
  - 172.16.0.3

Terraform example for AWS Route53

provider "aws" {
  region = "us-east-1"  # Change this to your preferred region
}

variable "hosted_zone_id" {
  type = string
}

variable "dns_records" {
  type = map(list(string))
}

resource "aws_route53_record" "dns_records" {
  for_each = var.dns_records

  zone_id = var.hosted_zone_id
  name    = each.key
  type    = "A"
  ttl     = 300
  records = each.value
}

Quite simple using for_each loop, but will not work with Cloudflare, because of the mentioned compatibility issue. So, we need new record for each IP.

Terraform example for Cloudflare

# Create multiple records for each domain, one per IP
resource "cloudflare_record" "dns_records" {
  for_each = { for k, v in var.dns_records : k => flatten([for ip in v : { domain = k, ip = ip }]) }

  zone_id = var.cloudflare_zone_id
  name    = each.value.domain
  type    = "A"
  value   = each.value.ip
  ttl     = 3600
  proxied = false  # Set to true if using Cloudflare proxy
}

Conclusions

  1. Pulumi: Flexible and easy to start. Data is separate from code, making it easy to add providers or change logic.
  2. Terraform: Less complex and easier to support long-term but depends on data format
  3. Both solutions require programming skills or expertise in Terraform language.

Simple LRU cache implementation on Python 3

What is LRU Cache?

This is caching item replacement policy, so the least used items will be discarded first.

Problem

The LRU Cache algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item.

Solution

Approaching a problem I was thinking of two capabilities for data structures: a FIFO queue and Hash table.

FIFO queue will be responsible to evict least used items. Hash table will be responsible to get cached items. Using this data structures make both operations in O(1) time complexity.

Python collections.OrderedDict combine both of this capabilities:

  • Queue: dict items are ordered as FIFO queue, so inserts and evictions are done in O(1)
  • Hash table: dict keys provide access to data in O(1) time
import collections
LRUCache = collections.OrderedDict()

Insert operation:

LRUCache[key] = value

It will add a key to the end of the dict, so position of new items is always fixed.

Check for hit:

if key in LRUCache:
  LRUCache.move_to_end(key)

Here we are doing two things: 1) Checking if key exist 2)If key is exist we move they key to the end of dict, so the keys which got a hit always update it’s position to become like newest key.

Discard or evict operation:

if len(LRUCache) > CACHE_SIZE:
  evict_key, evict_val = LRUCache.popitem(last=False)

Evict operation pop item from the beginning of the dict, so removing the oldest key in the dict.

Python 3 implementation

"""Simple LRUCache implementation."""

import collections
import os
import random
from math import factorial

CACHE_SIZE = 100
if os.getenv('CACHE_SIZE'):
    CACHE_SIZE = int(os.getenv('CACHE_SIZE'))

SAMPLE_SIZE = 100
if os.getenv('SAMPLE_SIZE'):
    SAMPLE_SIZE = int(os.getenv('SAMPLE_SIZE'))

LRUCache = collections.OrderedDict()


def expensive_call(number):
    """Calculate factorial. Example of expensive call."""
    return factorial(number)


if __name__ == '__main__':

    test_cases = random.choices(
        [x for x in range(SAMPLE_SIZE*3)],
        [x for x in range(SAMPLE_SIZE*3)],
        k=SAMPLE_SIZE
    )

    for test in test_cases:
        if test in LRUCache:
            print("hit:", test, LRUCache[test])
            # Update position of the hit item to first. Optional.
            LRUCache.move_to_end(test, last=True)
        else:
            LRUCache[test] = expensive_call(test)
            print("miss:", test, LRUCache[test])
        if len(LRUCache) > CACHE_SIZE:
            evict_key, evict_val = LRUCache.popitem(last=False)
            print("evict:", evict_key, evict_val)

As a use case I have used LRU cache to cache the output of expensive function call like factorial.

Sample size and Cache size are controllable through environment variables. Try to run it on small numbers to see how it behave:

CACHE_SIZE=4 SAMPLE_SIZE=10 python lru.py

Next steps are

  • Encapsulate business logic into class
  • Add Python magic functions to provide Pythonic way of dealing with class objects
  • Add unit test

References