I am new to Golang and Kubernetes. I tried to create a custom controller in golang using the client-go library. The controller connects with the K8s Api server brings the pods details into cache and sends it to the workqueue where I perform some actions on the pods. However I want the process to be fast and for that I need to create multiple workers. How to create multiple workers which could act upon the same workqueue and enhance the speed of the code?
Below is the sample of my controller:
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rs "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}
var (
//used the config file
kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)
// Creating the SharedIndexInformer to bring the details into the cache
func CreateSharedIndexInformer() {
flag.Parse()
//creating config using the kubeconfig file
configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
fmt.Println("Unable to find the file")
}
cs, err := kubernetes.NewForConfig(configuration)
//Creating the queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
//Creating the SharedIndexInformer
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
return cs.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cs.CoreV1().Pods("").Watch(context.TODO(), options)
},
},
&v1.Pod{},
time.Second*10, //Skip resync
cache.Indexers{},
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
controller := &Controller{
clientset: cs,
queue: queue,
informer: informer,
}
stop := make(chan struct{})
go controller.Run(stop)
// Wait forever
select {}
}
func (c *Controller) Run(stopCh chan struct{}) {
// don't let panics crash the process
defer runtime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
//c.logger.Info("Starting kubewatch controller")
go c.informer.Run(stopCh)
// wait for the caches to synchronize before starting the worker
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
//c.logger.Info("Kubewatch controller synced and ready")
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *Controller) runWorker() {
// processNextWorkItem will automatically wait until there's work available
for c.processNextItem() {
// continue looping
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
var obj string
var ok bool
if obj, ok = key.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.queue.Forget(key)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
}
// do your work on the key.
err := c.processBusinessLogic(key.(string))
if err == nil {
// No error, tell the queue to stop tracking history
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < 10 {
//c.logger.Errorf("Error processing %s (will retry): %v", key, err)
// requeue the item to work on later
c.queue.AddRateLimited(key)
} else {
// err != nil and too many retries
//c.logger.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
runtime.HandleError(err)
}
return true
}
func (c *Controller) processBusinessLogic(key string) error {
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
//Perform some business logic over the pods or Deployment
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
}
}
}
return nil
}
func (c *Controller) handleErr(err error, key interface{}) {
glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
func main() {
CreateSharedIndexInformer()
}
You can just add more workers in your
Run
function like follows: