How to create multiple workers to fetch the keys from the workqueue and process some business logic over them?

1.3k views Asked by At

I am new to Golang and Kubernetes. I tried to create a custom controller in golang using the client-go library. The controller connects with the K8s Api server brings the pods details into cache and sends it to the workqueue where I perform some actions on the pods. However I want the process to be fast and for that I need to create multiple workers. How to create multiple workers which could act upon the same workqueue and enhance the speed of the code?

Below is the sample of my controller:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/golang/glog"

    "k8s.io/apimachinery/pkg/watch"

    v1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    rs "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset kubernetes.Interface
    queue     workqueue.RateLimitingInterface
    informer  cache.SharedIndexInformer
}

var (
    //used the config file
    kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)

// Creating the SharedIndexInformer to bring the details into  the cache
func CreateSharedIndexInformer() {
    flag.Parse()
    //creating config using the kubeconfig file
    configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err.Error())
        fmt.Println("Unable to find the file")
    }

    cs, err := kubernetes.NewForConfig(configuration)

    //Creating the queue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    
   //Creating the SharedIndexInformer 
    informer := cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
                return cs.CoreV1().Pods("").List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                return cs.CoreV1().Pods("").Watch(context.TODO(), options)
            },
        },
        &v1.Pod{},
        time.Second*10, //Skip resync
        cache.Indexers{},
    )
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
                if err == nil {
                    queue.Add(key)
                }
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    controller := &Controller{
        clientset: cs,
        queue:     queue,
        informer:  informer,
    }
    stop := make(chan struct{})
    go controller.Run(stop)

    // Wait forever
    select {}
}


func (c *Controller) Run(stopCh chan struct{}) {

    // don't let panics crash the process
    defer runtime.HandleCrash()
    // make sure the work queue is shutdown which will trigger workers to end
    defer c.queue.ShutDown()

    //c.logger.Info("Starting kubewatch controller")

    go c.informer.Run(stopCh)

    // wait for the caches to synchronize before starting the worker
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    //c.logger.Info("Kubewatch controller synced and ready")

    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    go wait.Until(c.runWorker, time.Second, stopCh)
    <-stopCh
}

func (c *Controller) runWorker() {
    // processNextWorkItem will automatically wait until there's work available
    for c.processNextItem() {
        // continue looping
    }
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.

func (c *Controller) processNextItem() bool {
    // pull the next work item from queue.  It should be a key we use to lookup
    // something in a cache
    key, quit := c.queue.Get()
    if quit {
        return false
    }

    // you always have to indicate to the queue that you've completed a piece of
    // work
    defer c.queue.Done(key)
    var obj string
    var ok bool
    if obj, ok = key.(string); !ok {
        // As the item in the workqueue is actually invalid, we call
        // Forget here else we'd go into a loop of attempting to
        // process a work item that is invalid.
        c.queue.Forget(key)
        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))

    }

    // do your work on the key.
    err := c.processBusinessLogic(key.(string))

    if err == nil {
        // No error, tell the queue to stop tracking history
        c.queue.Forget(key)
    } else if c.queue.NumRequeues(key) < 10 {
        //c.logger.Errorf("Error processing %s (will retry): %v", key, err)
        // requeue the item to work on later
        c.queue.AddRateLimited(key)
    } else {
        // err != nil and too many retries
        //c.logger.Errorf("Error processing %s (giving up): %v", key, err)
        c.queue.Forget(key)
        runtime.HandleError(err)
    }

    return true
}

func (c *Controller) processBusinessLogic(key string) error {
    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
    if err != nil {
        glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }   

    if !exists {
        // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
        fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
              //Perform some business logic over the pods or Deployment

        // Note that you also have to check the uid if you have a local controlled resource, which
        // is dependent on the actual instance, to detect that a Pod was recreated with the same name
        fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
    }
        }

    }

    return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
    glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func main() {
    CreateSharedIndexInformer()
}
1

There are 1 answers

7
Rui On BEST ANSWER

You can just add more workers in your Run function like follows:

func (c *Controller) Run(stopCh chan struct{}) {
    ...
    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    for i := 0; i < 5; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    <-stopCh
}