Skip to content

Commit

Permalink
增加自定义controller的demo
Browse files Browse the repository at this point in the history
  • Loading branch information
zq2599 committed Mar 31, 2019
1 parent 8532f19 commit 228227c
Show file tree
Hide file tree
Showing 10 changed files with 475 additions and 0 deletions.
214 changes: 214 additions & 0 deletions k8s_customize_controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package main

import (
"fmt"
"time"

"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

bolingcavalryv1 "k8s_customize_controller/pkg/apis/bolingcavalry/v1"
clientset "k8s_customize_controller/pkg/client/clientset/versioned"
studentscheme "k8s_customize_controller/pkg/client/clientset/versioned/scheme"
informers "k8s_customize_controller/pkg/client/informers/externalversions/bolingcavalry/v1"
listers "k8s_customize_controller/pkg/client/listers/bolingcavalry/v1"
)

const controllerAgentName = "student-controller"

const (
SuccessSynced = "Synced"

MessageResourceSynced = "Student synced successfully"
)

// Controller is the controller implementation for Student resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// studentclientset is a clientset for our own API group
studentclientset clientset.Interface

studentsLister listers.StudentLister
studentsSynced cache.InformerSynced

workqueue workqueue.RateLimitingInterface

recorder record.EventRecorder
}

// NewController returns a new student controller
func NewController(
kubeclientset kubernetes.Interface,
studentclientset clientset.Interface,
studentInformer informers.StudentInformer) *Controller {

utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeclientset: kubeclientset,
studentclientset: studentclientset,
studentsLister: studentInformer.Lister(),
studentsSynced: studentInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),
recorder: recorder,
}

glog.Info("Setting up event handlers")
// Set up an event handler for when Student resources change
studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStudent,
UpdateFunc: func(old, new interface{}) {
oldStudent := old.(*bolingcavalryv1.Student)
newStudent := new.(*bolingcavalryv1.Student)
if oldStudent.ResourceVersion == newStudent.ResourceVersion {
//版本一致,就表示没有实际更新的操作,立即返回
return
}
controller.enqueueStudent(new)
},
DeleteFunc: controller.enqueueStudentForDelete,
})

return controller
}

//在此处开始controller的业务
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

glog.Info("开始controller业务,开始一次缓存数据同步")
if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

glog.Info("worker启动")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

glog.Info("worker已经启动")
<-stopCh
glog.Info("worker已经结束")

return nil
}

func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

// 取数据处理
func (c *Controller) processNextWorkItem() bool {

obj, shutdown := c.workqueue.Get()

if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool

if key, ok = obj.(string); !ok {

c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// 在syncHandler中处理业务
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}

c.workqueue.Forget(obj)
glog.Infof("Successfully synced '%s'", key)
return nil
}(obj)

if err != nil {
runtime.HandleError(err)
return true
}

return true
}

// 处理
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

// 从缓存中取对象
student, err := c.studentsLister.Students(namespace).Get(name)
if err != nil {
// 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行
if errors.IsNotFound(err) {
glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name)

return nil
}

runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name))

return err
}

glog.Infof("这里是student对象的期望状态: %#v ...", student)
glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)")

c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}

// 数据先放入缓存,再入队列
func (c *Controller) enqueueStudent(obj interface{}) {
var key string
var err error
// 将对象放入缓存
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}

// 将key放入队列
c.workqueue.AddRateLimited(key)
}

// 删除操作
func (c *Controller) enqueueStudentForDelete(obj interface{}) {
var key string
var err error
// 从缓存中删除指定对象
key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
//再将key放入队列
c.workqueue.AddRateLimited(key)
}
27 changes: 27 additions & 0 deletions k8s_customize_controller/crd/student.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
# metadata.name的内容是由"复数名.分组名"构成,如下,students是复数名,bolingcavalry.k8s.io是分组名
name: students.bolingcavalry.k8s.io
spec:
# 分组名,在REST API中也会用到的,格式是: /apis/分组名/CRD版本
group: bolingcavalry.k8s.io
# list of versions supported by this CustomResourceDefinition
versions:
- name: v1
# 是否有效的开关.
served: true
# 只有一个版本能被标注为storage
storage: true
# 范围是属于namespace的
scope: Namespaced
names:
# 复数名
plural: students
# 单数名
singular: student
# 类型名
kind: Student
# 简称,就像service的简称是svc
shortNames:
- stu
63 changes: 63 additions & 0 deletions k8s_customize_controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"flag"
"time"

"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

clientset "k8s_customize_controller/pkg/client/clientset/versioned"
informers "k8s_customize_controller/pkg/client/informers/externalversions"
"k8s_customize_controller/pkg/signals"
)

var (
masterURL string
kubeconfig string
)

func main() {
flag.Parse()

// 处理信号量
stopCh := signals.SetupSignalHandler()

// 处理入参
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

studentClient, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building example clientset: %s", err.Error())
}

studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)

//得到controller
controller := NewController(kubeClient, studentClient,
studentInformerFactory.Bolingcavalry().V1().Students())

//启动informer
go studentInformerFactory.Start(stopCh)

//controller开始处理消息
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
6 changes: 6 additions & 0 deletions k8s_customize_controller/pkg/apis/bolingcavalry/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bolingcavalry

const (
GroupName = "bolingcavalry.k8s.io"
Version = "v1"
)
4 changes: 4 additions & 0 deletions k8s_customize_controller/pkg/apis/bolingcavalry/v1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// +k8s:deepcopy-gen=package

// +groupName=bolingcavalry.k8s.io
package v1
39 changes: 39 additions & 0 deletions k8s_customize_controller/pkg/apis/bolingcavalry/v1/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s_customize_controller/pkg/apis/bolingcavalry"
)

var SchemeGroupVersion = schema.GroupVersion{
Group: bolingcavalry.GroupName,
Version: bolingcavalry.Version,
}

var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)

func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
SchemeGroupVersion,
&Student{},
&StudentList{},
)

// register the type in the scheme
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
30 changes: 30 additions & 0 deletions k8s_customize_controller/pkg/apis/bolingcavalry/v1/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type Student struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec StudentSpec `json:"spec"`
}

type StudentSpec struct {
name string `json:"name"`
school string `json:"school"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// StudentList is a list of Student resources
type StudentList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`

Items []Student `json:"items"`
}
Loading

0 comments on commit 228227c

Please sign in to comment.