Kubernetes operators quick start

Kubernetes operator provide easy and simple way to extend Kubernetes cluster with additional functionality.

For example you might find that you need custom operator in areas like:

  1. Managing stateful applications: take any Database, Message Queue or Cache as example, it needs full lifecycle of operations to be good for production. For a long time running databases in Kubernetes was an anti-pattern, but with operators things started to changed.
  2. Automating Application Lifecycle Management: the complexity of making new application can be overwhelmed for your development teams, so why not to introduce something more simple with just few core fields which are shared between all applications.
  3. Enhancing Observability: operators like Prometheus operator or Alermanager hides complexity of managing these tools and provide simple interface to get started quick.
  4. Automated Backup, Restore, and Disaster Recovery: For stateful applications, operators can automate the process of taking regular backups, storing them securely, and orchestrating the restoration process in case of data loss or system failure.
  5. CI/CD and GitOps Automation: operators play crucial role in GitOps workflow. Tools like the Pulumi Kubernetes Operator enable managing infrastructure alongside Kubernetes workloads.
  6. Networking and Security Management: Operators for service meshes like Istio or Linkerd simplify their installation, configuration, and upgrades across the cluster.
  7. Building Platform-as-a-Service (PaaS)-like Capabilities: By abstracting away underlying Kubernetes resources, operators can provide a simplified, application-centric interface for developers, similar to what a PaaS offers.

Writing your own operator

If you think to get started to write your own operator it’s good to know what any operator consist of:

API Design

Custom Resource Definition (CRD) Design. The CRD is the API your users (and other tools) will interact with. A well-designed CRD is intuitive, clear, and extensible. A poorly designed one can be confusing and hard to evolve.

// AppoperatorSpec defines the desired state of Appoperator.
type AppoperatorSpec struct {
// Image specify the container image for the deployment
Image string `json:"image,omitempty"`

// Replicas specify the container replica count for the deployment
Replicas *int32 `json:"replicas,omitempty"`
}

// AppoperatorStatus defines the observed state of Appoperator.
type AppoperatorStatus struct {
// Conditions represent the latest available observations of the Appoperator's state.
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
  1. Keep the spec (desired state) as simple as possible for the user, abstracting away unnecessary complexity.
  2. Implement robust OpenAPI v3 schema validation in your CRD to catch errors early and guide users.
  3. Design a comprehensive status subresource to provide clear feedback to users about the state of the managed resources and any errors.

Reconciliation Loop Logic

This is the heart of your operator. It’s the code that observes the current state of the system and the desired state (from the CR) and takes action to make them match.

log := logf.FromContext(ctx)

// Fetch the Appoperator instance
var appoperator toolsv1beta1.Appoperator
if err := r.Get(ctx, req.NamespacedName, &appoperator); err != nil {
if apierrors.IsNotFound(err) {
// The resource was deleted, nothing to do
return ctrl.Result{}, nil
}
log.Error(err, "Failed to fetch Appoperator")
return ctrl.Result{}, err
}
// Logic to create deployment
// Logic to compare current deployment with spec
  1. The reconcile function must be idempotent. This means running it multiple times with the same inputs should produce the same outcome without unintended side effects.
  2. Properly handle errors from API calls or other operations. Decide whether an error is terminal or if the reconciliation should be retried.
  3. Ensure you correctly create, update, and delete managed resources (Deployments, Services, etc.). Avoid leaking resources.

State Management

Operators often manage stateful applications or complex workflows. Ensuring the operator correctly understands and transitions between states is vital.

deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: appoperator.Name,
Namespace: appoperator.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: appoperator.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": appoperator.Name},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": appoperator.Name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "app-container",
Image: appoperator.Spec.Image,
},
},
},
},
},
}
// Get existing deployment
// Compare with CR state and update if necessary
  1. Accurately read the current state of all managed resources from the Kubernetes API server before making decisions.
  2. Implement logic to detect differences between the CR spec and the actual state of the cluster.
  3. Be mindful of potential race conditions

Owner References and Garbage Collection

Proper use of owner references is essential for ensuring that Kubernetes automatically cleans up resources created by your operator when the parent Custom Resource is deleted.

  1. Ensure that all resources created and managed by the operator have an ownerReference pointing back to the instance of your Custom Resource that “owns” them.
  2. If your operator needs to perform actions before its Custom Resource can be deleted (e.g., de-provisioning an external resource, taking a final backup), use finalizers. This prevents the CR from being deleted until the operator removes the finalizer.

Status Reporting

Users rely on the status of your Custom Resource to understand what the operator is doing and whether their application is healthy.

  1. Use standard Kubernetes Conditions (e.g., Type: Ready, Status: True/False/Unknown) to provide a standardized way of reporting status.
  2. Include informative messages and reasons in your status conditions.
  3. Reflect the metadata.generation of the CR that was processed in the status.observedGeneration. This helps users and other tools understand if the status reflects the latest changes to the spec.

Versioning

Versioning is a critical aspect of developing and maintaining Kubernetes operators, as it impacts how users interact with your custom APIs and how your operator software evolves. It breaks down into two main areas: CRD (API) Versioning and Operator Software Versioning

A single CRD can define multiple API versions in its spec.versions list. In Kubernetes, all versions must be safely round-tripable through each other. This means that if we convert from version 1 to version 2, and then back to version 1, we must not lose information.

Operator software versioning refers to how you version the operator’s code and container image. Users need to know which version of the operator software supports which features and CRD versions.

Effective versioning requires a disciplined approach. For CRDs, follow Kubernetes API conventions, plan for conversions, and clearly communicate stability and deprecation. For operator software, use semantic versioning, clearly map operator versions to the CRD versions they support, and have a robust upgrade strategy, potentially leveraging tools like OLM. Both aspects are crucial for the long-term health and usability of your operator.

Create with Kubebuilder

Kubebuilder isn’t just providing boilerplate code and some level of automation, but it also a great learning resource to get start with operators. It covers:

  • The structure of Kubernetes APIs and Resources
  • API versioning semantics
  • Self-healing
  • Garbage Collection and Finalizers
  • Declarative vs Imperative APIs
  • Level-Based vs Edge-Base APIs
  • Resources vs Subresources
  • and more.

Generating manifests, running test, deployment and installation and other useful command available to make things easier from the start.

Summary

Kubernetes operators provide a powerful way to extend cluster functionality, primarily by automating the complex lifecycle management of applications (especially stateful ones like databases), enhancing observability, and enabling PaaS-like capabilities.

Developing a robust operator requires careful attention to its API (CRD) design, implementing idempotent reconciliation logic for state management, ensuring proper resource cleanup via owner references, clear status reporting, and disciplined versioning for both the CRD and the operator software.

LLM-Powered Unit Testing: Effortlessly Generate Domain Model Data

Traditional approaches to generating model data for unit tests often involve setting up complex fixtures or writing cumbersome mock classes. This becomes especially challenging when your domain model is tightly coupled with your data mode.

class Order(Base):
# ...
product_name = Column(String, index=True)
quantity = Column(Integer)

In more advanced scenarios, your Domain model is decoupled from the Data model, allowing for cleaner architecture and improved maintainability.

class Order(BaseModel):
product_name: str
quantity: int

def testOrder(self):
order = Order(product_name="Milk", quantity=2)
self.assertEqual(order.quantity, 2)

Using hard-coded data can be a sub-optimal approach. Every modification to the Domain model requires updates to the test suite.

LLM-generated data comes to the rescue. By leveraging models like ChatGPT-4, you can dynamically generate test data that adheres to your Domain model’s schema.

In the following example, we’ll use ChatGPT-4 to generate data for a Electronics Domain model class, ensuring the structure and content align with the model’s expected schema.

# Imports are skipped to demo the core of the solution

class Electronics(BaseModel):
""" Domain model for Electronics """
model: str
price: float = Field(gt=0, description="Price must be greater than 0")

@llm.call(provider="openai", model="gpt-4o", response_model=List[Electronics])
def list_electronics(limit: int) -> List[Electronics]:
""" Generate N items of products defined as Electronics """
return f"Generate {limit} number of electronic products"

And the unittests:

class TestElectronics(unittest.TestCase):
""" Test class using LLM to generate domain data """

@classmethod
def setUpClass(cls):
""" Generating test data once """
cls._products = list_electronics(5)

def test_unique_models(self):
""" Test product models are unique """
models = [product.model for product in self._products]
self.assertEqual(len(models), len(set(models)))

Running the LLM can take a few seconds, so to optimize performance, I move the data generation process to the setUpClass method.

Big kudos to mirascope, llm, smartfunc and other libraries to make LLMs much more accessible to every day applications!

Feel free to try that yourself, all source code available in the Github project.

If you are interested in a command line LLM integration check out How to automate review with gptscript.

Conclusions:

  1. LLMs become much more accessible in day to day programming.
  2. Unittests data generation is just an example, but it demonstrates how easy it is to embed LLMs in real life scenarios
  3. It is still expensive and time consuming to call LLM for each unit tests invocation, so smarter approach is to generate and cache tests data

Simple LRU cache implementation on Python 3

What is LRU Cache?

This is caching item replacement policy, so the least used items will be discarded first.

Problem

The LRU Cache algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item.

Solution

Approaching a problem I was thinking of two capabilities for data structures: a FIFO queue and Hash table.

FIFO queue will be responsible to evict least used items. Hash table will be responsible to get cached items. Using this data structures make both operations in O(1) time complexity.

Python collections.OrderedDict combine both of this capabilities:

  • Queue: dict items are ordered as FIFO queue, so inserts and evictions are done in O(1)
  • Hash table: dict keys provide access to data in O(1) time
import collections
LRUCache = collections.OrderedDict()

Insert operation:

LRUCache[key] = value

It will add a key to the end of the dict, so position of new items is always fixed.

Check for hit:

if key in LRUCache:
  LRUCache.move_to_end(key)

Here we are doing two things: 1) Checking if key exist 2)If key is exist we move they key to the end of dict, so the keys which got a hit always update it’s position to become like newest key.

Discard or evict operation:

if len(LRUCache) > CACHE_SIZE:
  evict_key, evict_val = LRUCache.popitem(last=False)

Evict operation pop item from the beginning of the dict, so removing the oldest key in the dict.

Python 3 implementation

"""Simple LRUCache implementation."""

import collections
import os
import random
from math import factorial

CACHE_SIZE = 100
if os.getenv('CACHE_SIZE'):
    CACHE_SIZE = int(os.getenv('CACHE_SIZE'))

SAMPLE_SIZE = 100
if os.getenv('SAMPLE_SIZE'):
    SAMPLE_SIZE = int(os.getenv('SAMPLE_SIZE'))

LRUCache = collections.OrderedDict()


def expensive_call(number):
    """Calculate factorial. Example of expensive call."""
    return factorial(number)


if __name__ == '__main__':

    test_cases = random.choices(
        [x for x in range(SAMPLE_SIZE*3)],
        [x for x in range(SAMPLE_SIZE*3)],
        k=SAMPLE_SIZE
    )

    for test in test_cases:
        if test in LRUCache:
            print("hit:", test, LRUCache[test])
            # Update position of the hit item to first. Optional.
            LRUCache.move_to_end(test, last=True)
        else:
            LRUCache[test] = expensive_call(test)
            print("miss:", test, LRUCache[test])
        if len(LRUCache) > CACHE_SIZE:
            evict_key, evict_val = LRUCache.popitem(last=False)
            print("evict:", evict_key, evict_val)

As a use case I have used LRU cache to cache the output of expensive function call like factorial.

Sample size and Cache size are controllable through environment variables. Try to run it on small numbers to see how it behave:

CACHE_SIZE=4 SAMPLE_SIZE=10 python lru.py

Next steps are

  • Encapsulate business logic into class
  • Add Python magic functions to provide Pythonic way of dealing with class objects
  • Add unit test

References

Simple example of golang os.Args and flag module

Hi all,
this is day 1 of 100 days I am coding one go program a day. Wish me good luck!

In this example I will show how to parse command line arguments with flag module and os.Args array.

Flag module is very powerful and can be as simple as this:

//...
import "flag"
//...
name := flag.String("user", "", "user name")
flag.Parse()
fmt.Printf("user name is %s\n", *name)

To have the “same” functionality with os.Args you have to do some work. In following example I added support for –user, -user and user=<value> arguments.

//...
import "os"
//...
var userArg string
for index, arg := range os.Args {
	pattern := "-user="
	x := strings.Index(arg, pattern)
	if x > -1 {
		userArg = arg[x+len(pattern):]
		continue
	}
	if arg == "-user" || arg == "--user" {
		userArg = os.Args[index+1]
		continue
	}
}
fmt.Printf("user name is %s", userArg)

Full code:

package main

import (
	"flag"
	"fmt"
	"os"
	"strings"
)

func main() {
	name := flag.String("user", "", "user name")
	flag.Parse()
	fmt.Printf("user name is %s\n", *name)

	var userArg string
	for index, arg := range os.Args {
		pattern := "-user="
		x := strings.Index(arg, pattern)
		if x > -1 {
			userArg = arg[x+len(pattern):]
			continue
		}
		if arg == "-user" || arg == "--user" {
			userArg = os.Args[index+1]
			continue
		}
	}
	fmt.Printf("user name is %s", userArg)
}

Latest code examples see on GitHub