使用client-go实现自定义控制器

博客 动态
0 163
优雅殿下
优雅殿下 2022-05-11 10:59:04
悬赏:0 积分 收藏

使用client-go实现自定义控制器

使用client-go实现自定义控制器

介绍

我们已经知道,Service对集群之外暴露服务的主要方式有两种:NodePort和LoadBalancer,但是这两种方式,都有一定的缺点:

  • NodePort方式的缺点是会占用很多集群机器的端口,那么当集群服务变多的时候,这个缺点就愈发明显。
  • LoadBalancer的缺点是每个Service都需要一个LB,浪费,麻烦,并且需要Kubernetes之外的设备的支持。

基于这种现状,Kubernetes提供了Ingress资源对象,Ingress只需要一个NodePort或者一个LB就可以满足暴露多个Service的需求。

客户端首先对 域名 执行 DNS 解析,得到 Ingress Controller 所在节点的 IP,然后客户端向 Ingress Controller 发送 HTTP 请求,然后根据 Ingress 对象里面的描述匹配域名,找到对应的 Service 对象,并获取关联的 Endpoints 列表,将客户端的请求转发给其中一个 Pod。

image

本文我们来使用client-go实现一个自定义控制器,通过判断serviceAnnotations属性是否包含ingress/http字段,如果包含则创建ingress,如果不包含则不创建。而且如果存在ingress则进行删除。

具体实现

首先我们创建项目。

$ mkdir ingress-manager && cd ingress-manager$ go mod init ingress-manager# 由于控制器部分的内容比较多,将它们单独放到pkg目录下$ mkdir pkg# 最终项目目录结构如下.├── go.mod├── go.sum├── main.go└── pkg    └── controller.go

接着我们来实现controller部分:

package pkgimport (	"context"	apiCoreV1 "k8s.io/api/core/v1"	netV1 "k8s.io/api/networking/v1"	"k8s.io/apimachinery/pkg/api/errors"	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"	"k8s.io/apimachinery/pkg/util/runtime"	"k8s.io/apimachinery/pkg/util/wait"	informersCoreV1 "k8s.io/client-go/informers/core/v1"	informersNetV1 "k8s.io/client-go/informers/networking/v1"	"k8s.io/client-go/kubernetes"	coreV1 "k8s.io/client-go/listers/core/v1"	v1 "k8s.io/client-go/listers/networking/v1"	"k8s.io/client-go/tools/cache"	"k8s.io/client-go/util/workqueue"	"reflect"	"time")const (	workNum  = 5  // 工作的节点数	maxRetry = 10 // 最大重试次数 )// 定义控制器type Controller struct {	client        kubernetes.Interface	ingressLister v1.IngressLister	serviceLister coreV1.ServiceLister	queue         workqueue.RateLimitingInterface}// 初始化控制器func NewController(client kubernetes.Interface, serviceInformer informersCoreV1.ServiceInformer, ingressInformer informersNetV1.IngressInformer) Controller {	c := Controller{		client:        client,		ingressLister: ingressInformer.Lister(),		serviceLister: serviceInformer.Lister(),		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),	}	// 添加事件处理函数	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{		AddFunc:    c.addService,		UpdateFunc: c.updateService,	})	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{		DeleteFunc: c.deleteIngress,	})	return c}// 入队func (c *Controller) enqueue(obj interface{}) {	key, err := cache.MetaNamespaceKeyFunc(obj)	if err != nil {		runtime.HandleError(err)	}	c.queue.Add(key)}func (c *Controller) addService(obj interface{}) {	c.enqueue(obj)}func (c *Controller) updateService(oldObj, newObj interface{}) {	// todo 比较annotation	// 这里只是比较了对象是否相同,如果相同,直接返回	if reflect.DeepEqual(oldObj, newObj) {		return	}	c.enqueue(newObj)}func (c *Controller) deleteIngress(obj interface{}) {	ingress := obj.(*netV1.Ingress)	ownerReference := metaV1.GetControllerOf(ingress)	if ownerReference == nil {		return	}	// 判断是否为真的service	if ownerReference.Kind != "Service" {		return	}	c.queue.Add(ingress.Namespace + "/" + ingress.Name)}// 启动控制器,可以看到开了五个协程,真正干活的是workerfunc (c *Controller) Run(stopCh chan struct{}) {	for i := 0; i < workNum; i++ {		go wait.Until(c.worker, time.Minute, stopCh)	}	<-stopCh}func (c *Controller) worker() {	for c.processNextItem() {	}}// 业务真正处理的地方func (c *Controller) processNextItem() bool {	// 获取key	item, shutdown := c.queue.Get()	if shutdown {		return false	}	defer c.queue.Done(item)  // 调用业务逻辑	err := c.syncService(item.(string))	if err != nil {    // 对错误进行处理		c.handlerError(item.(string), err)		return false	}	return true}func (c *Controller) syncService(item string) error {	namespace, name, err := cache.SplitMetaNamespaceKey(item)	if err != nil {		return err	}	// 获取service	service, err := c.serviceLister.Services(namespace).Get(name)	if err != nil {		if errors.IsNotFound(err) {			return nil		}		return err	}	// 新增和删除	_, ok := service.GetAnnotations()["ingress/http"]	ingress, err := c.ingressLister.Ingresses(namespace).Get(name)	if err != nil && !errors.IsNotFound(err) {		return err	} 	if ok && errors.IsNotFound(err) {		// 创建ingress		ig := c.constructIngress(service)		_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metaV1.CreateOptions{})		if err != nil {			return err		}	} else if !ok && ingress != nil {		// 删除ingress		err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{})		if err != nil {			return err		}	}	return nil}func (c *Controller) handlerError(key string, err error) {	// 如果出现错误,重新加入队列,最大处理10次	if c.queue.NumRequeues(key) <= maxRetry {		c.queue.AddRateLimited(key)		return	}	runtime.HandleError(err)	c.queue.Forget(key)}func (c *Controller) constructIngress(service *apiCoreV1.Service) *netV1.Ingress {	// 构造ingress	pathType := netV1.PathTypePrefix	ingress := netV1.Ingress{}	ingress.ObjectMeta.OwnerReferences = []metaV1.OwnerReference{		*metaV1.NewControllerRef(service, apiCoreV1.SchemeGroupVersion.WithKind("Service")),	}	ingress.Namespace = service.Namespace	ingress.Name = service.Name	ingress.Spec = netV1.IngressSpec{		Rules: []netV1.IngressRule{			{				Host: "example.com",				IngressRuleValue: netV1.IngressRuleValue{					HTTP: &netV1.HTTPIngressRuleValue{						Paths: []netV1.HTTPIngressPath{							{								Path:     "/",								PathType: &pathType,								Backend: netV1.IngressBackend{									Service: &netV1.IngressServiceBackend{										Name: service.Name,										Port: netV1.ServiceBackendPort{											Number: 80,										},									},								},							},						},					},				},			},		},	}	return &ingress}

接下来我们来实现main:

package mainimport (	"ingress-manager/pkg"	"k8s.io/client-go/informers"	"k8s.io/client-go/kubernetes"	"k8s.io/client-go/rest"	"k8s.io/client-go/tools/clientcmd")func main() {	// 获取config	// 先尝试从集群外部获取,获取不到则从集群内部获取	var config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)	if err != nil {		clusterConfig, err := rest.InClusterConfig()		if err != nil {			panic(err)		}		config = clusterConfig	}	// 通过config创建 clientSet	clientSet, err := kubernetes.NewForConfig(config)	if err != nil {		panic(err)	}	// 通过 client 创建 informer,添加事件处理函数	factory := informers.NewSharedInformerFactory(clientSet, 0)	serviceInformer := factory.Core().V1().Services()	ingressInformer := factory.Networking().V1().Ingresses()	newController := pkg.NewController(clientSet, serviceInformer, ingressInformer)	// 启动 informer	stopCh := make(chan struct{})	factory.Start(stopCh)	factory.WaitForCacheSync(stopCh)	newController.Run(stopCh)}

测试

首先创建deploy和service:

apiVersion: apps/v1kind: Deploymentmetadata:  name: my-nginxspec:  selector:    matchLabels:      app: my-nginx  template:    metadata:      labels:        app: my-nginx    spec:      containers:        - name: my-nginx          image: nginx:1.17.1          ports:            - containerPort: 80---apiVersion: v1kind: Servicemetadata:  name: my-nginx  labels:    app: my-nginxspec:  ports:    - port: 80      protocol: TCP      name: http  selector:    app: my-nginx

创建完成后进行查看:

$ kubectl get deploy,service,ingressNAME                              READY   UP-TO-DATE   AVAILABLE   AGEdeployment.apps/my-nginx          1/1     1            1           7mNAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGEservice/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78dservice/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    7m

上面的命令我分别获取deploy,service,ingress,但是只获取到了deployservice,这符合我们的预期。接着我们给service/m-nginx中的annotations添加ingress/http: nginx

$ kubectl edit service/my-nginxapiVersion: v1kind: Servicemetadata:  annotations:    ingress/http: nginx    kubectl.kubernetes.io/last-applied-configuration: |      {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"my-nginx"},"name":"my-nginx","namespace":"default"},"spec":{"ports":[{"name":"http","port":80,"protocol":"TCP"}],"selector":{"app":"my-nginx"}}}      ......      service/my-nginx edited

重新进行查看:

$ kubectl get deploy,service,ingressNAME                              READY   UP-TO-DATE   AVAILABLE   AGEdeployment.apps/demo-deployment   1/1     1            1           41ddeployment.apps/my-nginx          1/1     1            1           11mNAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGEservice/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78dservice/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    11mNAME                                 CLASS    HOSTS         ADDRESS   PORTS   AGEingress.networking.k8s.io/my-nginx   <none>   example.com             80      19s

接着我们再来测试下,将ingress/http: nginx注释掉,看看ingress是否会自动删除:

$ kubectl get deploy,service,ingressNAME                              READY   UP-TO-DATE   AVAILABLE   AGEdeployment.apps/demo-deployment   1/1     1            1           41ddeployment.apps/my-nginx          1/1     1            1           19mNAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGEservice/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78dservice/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    19m

我们发现和我们预期的效果一样。

如果service被删除了,ingress肯定也是不会存在的。这个这里就不多演示了。有兴趣可以自行测试下。

posted @ 2022-05-11 10:21 李大鹅 阅读(14) 评论(0) 编辑 收藏 举报
回帖
    优雅殿下

    优雅殿下 (王者 段位)

    2018 积分 (2)粉丝 (47)源码

    小小码农,大大世界

     

    温馨提示

    亦奇源码

    最新会员