k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析
概述
k8s v1.16版本中NodeController
已经分为了NodeIpamController
与NodeLifecycleController
,本文主要介绍NodeLifecycleController
。
NodeLifecycleController
主要功能有:
(1)定期检查node的心跳上报,某个node间隔一定时间都没有心跳上报时,更新node的ready condition
值为false或unknown,开启了污点驱逐的情况下,给该node添加NoExecute
的污点;
(2)当污点驱逐未开启时,当node的ready Condition
值为false或unknown且已经持续了一段时间(该时间可配置)时,对该node上的pod做驱逐(删除)操作;
(3)当污点驱逐开启时,node上有NoExecute
污点后,立马驱逐(删除)不能容忍污点的pod,对于能容忍该污点的pod,则等待所有污点的容忍时间里最小值后,pod才被驱逐(删除);
源码分析
源码分析分成3部分:
(1)启动参数分析;
(2)初始化与相关结构体分析;
(3)处理逻辑分析;
1.相关启动参数分析
// cmd/kube-controller-manager/app/core.go
func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Apps().V1().DaemonSets(),
// node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)
if err != nil {
return nil, true, err
}
go lifecycleController.Run(ctx.Stop)
return nil, true, nil
}
看到上面的startNodeLifecycleController
函数中lifecyclecontroller.NewNodeLifecycleController
方法的入参,其中传入了多个kube-controller-manager的启动参数;
(1)ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration
;
即kcm启动参数--node-monitor-period
,默认值5秒,代表NodeLifecycleController
中更新同步node对象的status值(node的污点、node的condition值)的周期;
fs.DurationVar(&o.NodeMonitorPeriod.Duration, "node-monitor-period", o.NodeMonitorPeriod.Duration,
"The period for syncing NodeStatus in NodeController.")
(2)ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration
;
即kcm启动参数--node-startup-grace-period
,默认值60秒,代表node启动后多久才会更新node对象的conditions值;
fs.DurationVar(&o.NodeStartupGracePeriod.Duration, "node-startup-grace-period", o.NodeStartupGracePeriod.Duration,
"Amount of time which we allow starting Node to be unresponsive before marking it unhealthy.")
(3)ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration
;
即kcm启动参数--node-monitor-grace-period
,默认值40秒,代表在距离上一次上报心跳时间超过40s后,将该node的conditions值更新为unknown(kubelet通过更新node lease来上报心跳);
fs.DurationVar(&o.NodeMonitorGracePeriod.Duration, "node-monitor-grace-period", o.NodeMonitorGracePeriod.Duration,
"Amount of time which we allow running Node to be unresponsive before marking it unhealthy. "+
"Must be N times more than kubelet's nodeStatusUpdateFrequency, "+
"where N means number of retries allowed for kubelet to post node status.")
(4)ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration
;
即kcm启动参数--pod-eviction-timeout
,默认值5分钟,当不开启污点驱逐时该参数起效,当node的ready condition值变为false或unknown并持续了5分钟后,将驱逐(删除)该node上的pod;
fs.DurationVar(&o.PodEvictionTimeout.Duration, "pod-eviction-timeout", o.PodEvictionTimeout.Duration, "The grace period for deleting pods on failed nodes.")
(5)ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager
;
即kcm启动参数--enable-taint-manager
,默认值true,代表启动taintManager,当已经调度到该node上的pod不能容忍node的NoExecute
污点时,由TaintManager负责驱逐此类pod,若为false即不启动taintManager,则根据--pod-eviction-timeout
来做驱逐操作;
fs.BoolVar(&o.EnableTaintManager, "enable-taint-manager", o.EnableTaintManager, "WARNING: Beta feature. If set to true enables NoExecute Taints and will evict all not-tolerating Pod running on Nodes tainted with this kind of Taints.")
(6)utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions)
;
即kcm启动参数--feature-gates=TaintBasedEvictions=xxx
,默认值true,配合--enable-taint-manager
共同作用,两者均为true,才会开启污点驱逐;
(7)ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate
;
即kcm启动参数--node-eviction-rate
,默认值0.1,代表当集群下某个zone(zone的概念后面详细介绍)为healthy时,每秒应该触发pod驱逐操作的node数量,默认0.1,即每10s触发1个node上的pod驱逐操作;
fs.Float32Var(&o.NodeEvictionRate, "node-eviction-rate", 0.1, "Number of nodes per second on which pods are deleted in case of node failure when a zone is healthy (see --unhealthy-zone-threshold for definition of healthy/unhealthy). Zone refers to entire cluster in non-multizone clusters.")
(8)ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate
;
即kcm启动参数--secondary-node-eviction-rate
,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold
(默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold
个,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate
个,默认为0.01;
fs.Float32Var(&o.SecondaryNodeEvictionRate, "secondary-node-eviction-rate", 0.01, "Number of nodes per second on which pods are deleted in case of node failure when a zone is unhealthy (see --unhealthy-zone-threshold for definition of healthy/unhealthy). Zone refers to entire cluster in non-multizone clusters. This value is implicitly overridden to 0 if the cluster size is smaller than --large-cluster-size-threshold.")
(9)ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold
;
即kcm启动参数--large-cluster-size-threshold
,默认值50,当某zone的节点数超过该值时,认为该zone是一个LargeCluster,不是LargeCluster时,对应的SecondaryNodeEvictionRate
配置会被设置为0;
fs.Int32Var(&o.LargeClusterSizeThreshold, "large-cluster-size-threshold", 50, "Number of nodes from which NodeController treats the cluster as large for the eviction logic purposes. --secondary-node-eviction-rate is implicitly overridden to 0 for clusters this size or smaller.")
(10)ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold
;
即kcm启动参数--unhealthy-zone-threshold
,代表认定某zone为unhealthy的阈值,即会影响什么时候开启二级驱逐速率;默认值0.55,当该zone中not ready节点(ready condition值不为true)数目超过55%,认定该zone为unhealthy;
fs.Float32Var(&o.UnhealthyZoneThreshold, "unhealthy-zone-threshold", 0.55, "Fraction of Nodes in a zone which needs to be not Ready (minimum 3) for zone to be treated as unhealthy. ")
(11)--feature-gates=NodeLease=xxx
:默认值true,使用lease对象上报node心跳信息,替换老的更新node的status的方式,能大大减轻apiserver的负担;
zone概念介绍
根据每个node对象的region和zone的label值,将node划分到不同的zone中;
region、zone值都相同的node,划分为同一个zone;
zone状态介绍
zone状态有四种,分别是:
(1)Initial
:初始化状态;
(2)FullDisruption
:ready的node数量为0,not ready的node数量大于0;
(3)PartialDisruption
:not ready的node数量大于2且其占比大于等于unhealthyZoneThreshold
;
(4)Normal
:上述三种状态以外的情形,都属于该状态;
需要注意二级驱逐速率对驱逐的影响,即kcm启动参数--secondary-node-eviction-rate
,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold
(默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold
,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate
个,默认为0.01;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
readyNodes := 0
notReadyNodes := 0
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
readyNodes++
} else {
notReadyNodes++
}
}
switch {
case readyNodes == 0 && notReadyNodes > 0:
return notReadyNodes, stateFullDisruption
case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
return notReadyNodes, statePartialDisruption
default:
return notReadyNodes, stateNormal
}
}
2.初始化与相关结构体分析
2.1 Controller结构体分析
Controller结构体关键属性:
(1)taintManager
:负责污点驱逐的manager;
(2)enterPartialDisruptionFunc
:返回当zone状态为PartialDisruption
时的驱逐速率(node节点数量超过largeClusterThreshold
时,返回secondaryEvictionLimiterQPS
,即kcm启动参数--secondary-node-eviction-rate
,否则返回0);
(3)enterFullDisruptionFunc
:返回当zone状态为FullDisruption
时的驱逐速率(直接返回NodeEvictionRate
值,kcm启动参数--node-eviction-rate
);
(4)computeZoneStateFunc
:计算zone状态的方法,即上面zone状态介绍中的ComputeZoneState
方法;
(5)nodeHealthMap
:用于记录所有node的最近一次的状态信息;
(6)zoneStates
:用于记录所有zone的状态;
(7)nodeMonitorPeriod
、nodeStartupGracePeriod
、nodeMonitorGracePeriod
、podEvictionTimeout
、evictionLimiterQPS
、secondaryEvictionLimiterQPS
、largeClusterThreshold
、unhealthyZoneThreshold
,上面介绍启动参数时已经做了分析;
(8)runTaintManager
:kcm启动参数--enable-taint-manager
赋值,代表是否启动taintManager;
(9)useTaintBasedEvictions
:代表是否开启污点驱逐,kcm启动参数--feature-gates=TaintBasedEvictions=xxx
赋值,默认值true,配合--enable-taint-manager
共同作用,两者均为true,才会开启污点驱逐;
Controller结构体中的两个关键队列:
(1)zonePodEvictor
:pod需要被驱逐的node节点队列(只有在未开启污点驱逐时,才用到该队列),当node的ready condition变为false或unknown且持续了podEvictionTimeout
的时间,会将该node放入该队列,然后有worker负责从该队列中读取node,去执行node上的pod驱逐操作;
(2)zoneNoExecuteTainter
:需要更新taint的node节点队列,当node的ready condition变为false或unknown时,会将该node放入该队列,然后有worker负责从该队列中读取node,去执行taint更新操作(增加notReady
或unreachable
的taint);
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
type Controller struct {
...
taintManager *scheduler.NoExecuteTaintManager
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to avoid the problem with time skew across the cluster.
now func() metav1.Time
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
knownNodeSet map[string]*v1.Node
// per Node map storing last observed health together with a local time when it was observed.
nodeHealthMap *nodeHealthMap
// evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
// TODO(#83954): API calls shouldn't be executed under the lock.
evictorLock sync.Mutex
nodeEvictionMap *nodeEvictionMap
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
nodesToRetry sync.Map
zoneStates map[string]ZoneState
daemonSetStore appsv1listers.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
recorder record.EventRecorder
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
// nodeMonitorGracePeriod.
// TODO: Change node health monitor to watch based.
nodeMonitorPeriod time.Duration
// When node is just created, e.g. cluster bootstrap or node creation, we give
// a longer grace period.
nodeStartupGracePeriod time.Duration
// Controller will not proactively sync node health, but will monitor node
// health signal updated from kubelet. There are 2 kinds of node healthiness
// signals: NodeStatus and NodeLease. NodeLease signal is generated only when
// NodeLease feature is enabled. If it doesn't receive update for this amount
// of time, it will start posting "NodeReady==ConditionUnknown". The amount of
// time before which Controller start evicting pods is controlled via flag
// 'pod-eviction-timeout'.
// Note: be cautious when changing the constant, it must work with
// nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
// controller. The node health signal update frequency is the minimal of the
// two.
// There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than the node health signal
// update frequency, where N means number of retries allowed for kubelet to
// post node status/lease. It is pointless to make nodeMonitorGracePeriod
// be less than the node health signal update frequency, since there will
// only be fresh values from Kubelet at an interval of node health signal
// update frequency. The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger
// value takes longer for user to see up-to-date node health.
nodeMonitorGracePeriod time.Duration
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// if set to true Controller will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.RateLimitingInterface
}
2.2 初始化逻辑分析
NewNodeLifecycleController
函数的主要逻辑为:
(1)初始化Controller
结构体,代表NodeLifecycleController
;
(2)给podInformer
注册EventHandler
(部分逻辑与TaintManager
相关);
(3)判断是否开启污点驱逐,即--enable-taint-manager
启动参数值是否配置为true,是则初始化TaintManager
并赋值给Controller
的taintManager
属性,随后给nodeInformer注册TaintManager
相关的EventHandler
;
(4)给nodeInformer
注册EventHandler
;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func NewNodeLifecycleController(
leaseInformer coordinformers.LeaseInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer appsv1informers.DaemonSetInformer,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorGracePeriod time.Duration,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
runTaintManager bool,
useTaintBasedEvictions bool,
) (*Controller, error) {
...
// (1)初始化`Controller`结构体;
nc := &Controller{
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: newNodeHealthMap(),
nodeEvictionMap: newNodeEvictionMap(),
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
nodesToRetry: sync.Map{},
zoneStates: make(map[string]ZoneState),
podEvictionTimeout: podEvictionTimeout,
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
}
if useTaintBasedEvictions {
klog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
// (2)给`podInformer`注册`EventHandler`(部分逻辑与`TaintManager`相关);
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
nc.podUpdated(nil, pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
nc.podUpdated(prevPod, newPod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Received unexpected object: %v", obj)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
return
}
}
nc.podUpdated(pod, nil)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
},
})
nc.podInformerSynced = podInformer.Informer().HasSynced
podInformer.Informer().AddIndexers(cache.Indexers{
nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
podIndexer := podInformer.Informer().GetIndexer()
nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, pod)
}
return pods, nil
}
nc.podLister = podInformer.Lister()
// (3)判断是否开启污点驱逐,即`--enable-taint-manager`启动参数值是否配置为true,是则初始化`TaintManager`并赋值给`Controller`的`taintManager`属性,随后给nodeInformer注册`TaintManager`相关的`EventHandler`;
if nc.runTaintManager {
podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
}
// (4) 给`nodeInformer`注册`EventHandler`;
klog.Infof("Controller will reconcile labels.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
nc.nodeEvictionMap.registerNode(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.nodesToRetry.Delete(node.Name)
nc.nodeEvictionMap.unregisterNode(node.Name)
return nil
}),
})
nc.leaseLister = leaseInformer.Lister()
nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
return nc, nil
}
3.处理逻辑分析
Run方法作为NodeLifecycleController
的处理逻辑分析入口,其主要逻辑为:
(1)等待leaseInformer
、nodeInformer
、podInformer
、daemonSetInformer
中的cache同步完成;
(2)判断是否开启污点驱逐,是则拉起一个goroutine,调用nc.taintManager.Run
方法,启动taintManager
;
(3)启动8个goroutine,即8个worker,循环调用nc.doNodeProcessingPassWorker
方法,用于处理nc.nodeUpdateQueue
队列;
nc.doNodeProcessingPassWorker
方法有两个作用:
(3-1)调用nc.doNoScheduleTaintingPass
方法,根据node.Status.Conditions
与node.Spec.Unschedulable
的值来更新node.Spec.Taints
,主要是设置Effect
为noschedule
的taint;
(3-2)调用nc.reconcileNodeLabels
方法,处理node对象中os和arch相关的label;
(4)启动4个goroutine,即4个worker,循环调用nc.doPodProcessingWorker
方法,用于处理nc.podUpdateQueue
队列;
nc.doPodProcessingWorker
方法做以下两个操作:
(4-1)当污点驱逐未开启时,判断node对象的status,当node的ready Condition
为false或unknown且已经持续了至少nc.podEvictionTimeout
的时间时,对该node上的pod做驱逐(删除)操作;
(4-2)如果node的ready condition
值不为true,则将pod的ready condition
更新为false;
(5)判断nc.useTaintBasedEvictions
是否为true,即是否开启污点驱逐,是则启动goroutine并循环调用nc.doNoExecuteTaintingPass
;
nc.doNoExecuteTaintingPass
方法主要作用是根据node.Status.Conditions
的值来更新node.Spec.Taints
,主要是设置Effect
为noExecute
的taint;
(6)未开启污点驱逐时,启动goroutine并循环调用nc.doEvictionPass
;
nc.doEvictionPass
方法主要作用是从nc.zonePodEvictor
中获取node,然后驱逐(删除)该node上除daemonset pod外的所有pod;
(7)启动goroutine,间隔nc.nodeMonitorPeriod
时间(即kcm启动参数--node-monitor-period
,默认值5秒)循环调用nc.monitorNodeHealth
方法;
nc.monitorNodeHealth
方法的主要作用是持续监控node的状态,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效),且当node心跳上报(node lease的更新时间)距离上一次上报时间已经超过nodeMonitorGracePeriod
(刚启动则为nodeStartupGracePeriod
),更新node对象的ready condition值,并做相应的驱逐处理:
(7-1)当开启了污点驱逐,且node的ready condition不为true时,添加NoExcute
污点,并将该node放入zoneNoExecuteTainter
中,由taintManager
来做驱逐操作;
(7-2)当没开启污点驱逐,且node的ready condition不为true持续了podEvictionTimeout
时间,则开始驱逐pod的操作;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
for i := 0; i < podUpdateWorkerSize; i++ {
go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
3.1 nc.taintManager.Run
taintManager
的主要功能为:当某个node被打上NoExecute
污点后,其上面的pod如果不能容忍该污点,则taintManager
将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;
通过kcm启动参数--enable-taint-manager
来确定是否启动taintManager
,true
时启动(启动参数默认值为true
);
kcm启动参数--feature-gates=TaintBasedEvictions=xxx
,默认值true,配合--enable-taint-manager
共同作用,两者均为true,才会开启污点驱逐;
taintManager
部分的内容比较多,将在后面单独一遍文章进行分析;
3.2 nc.doNodeProcessingPassWorker
nc.doNodeProcessingPassWorker
方法有两个作用:
(1)调用nc.doNoScheduleTaintingPass
方法,根据node.Status.Conditions
与node.Spec.Unschedulable
的值来更新node.Spec.Taints
,主要是设置Effect
为noschedule
的taint;
(2)调用nc.reconcileNodeLabels
方法,处理node对象中os和arch相关的label;
主要逻辑:
(1)循环消费nodeUpdateQueue
队列,从队列中获取一个nodeName;
(2)调用nc.doNoScheduleTaintingPass
,对node的taint进行处理;
(3)调用nc.reconcileNodeLabels
,对node对象中os和arch相关的label进行处理;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doNodeProcessingPassWorker() {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
nodeName := obj.(string)
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
// TODO(k82cn): Add nodeName back to the queue
}
// TODO: re-evaluate whether there are any labels that need to be
// reconcile in 1.19. Remove this function if it's no longer necessary.
if err := nc.reconcileNodeLabels(nodeName); err != nil {
klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
// TODO(yujuhong): Add nodeName back to the queue
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
3.2.1 nc.doNoScheduleTaintingPass
nc.doNoScheduleTaintingPass
方法主要作用是根据node.Status.Conditions
与node.Spec.Unschedulable
的值来更新node.Spec.Taints
,主要是设置Effect
为noschedule
的taint;
主要逻辑:
(1)调用nc.nodeLister.Get
,从informer本地缓存中获取node对象,不存在则直接return;
(2)根据node.Status.Conditions
的值,获得相应的taints;
(2-1)node.status.Conditions
中有type为ready
的condition。如果这个condition.status
为fasle,设置key为node.kubernetes.io/not-ready
,Effect为noschedule
的taint;如果这个condition.status
值为unknown,设置key为node.kubernetes.io/unreachable
,Effect为noschedule
的taint;
(2-2)node.status.Conditions
中有type为MemoryPressure
的condition。如果这个condition.status
为true,设置key为node.kubernetes.io/memory-pressure
,Effect为noschedule
的taint;
(2-3)node.status.Conditions
中有type为DiskPressure
的condition。如果这个condition.status
为true,设置key为node.kubernetes.io/disk-pressure
,Effect为noschedule
的taint;
(2-4)node.status.Conditions
中有type为NetworkUnavailable
的condition。如果这个condition.status
为true,设置key为node.kubernetes.io/network-unavailable
,Effect为noschedule
的taint;
(2-5)node.status.Conditions
中有type为PIDPressure
的condition。如果这个condition.status
为true,设置key为node.kubernetes.io/pid-pressure
,Effect为noschedule
的taint;
(3)如果node.Spec.Unschedulable
值为true,则再追加key为node.kubernetes.io/unschedulable
,Effect为noschedule
的taint到taints中;
(4)调用nodeutil.SwapNodeControllerTaint
,更新node的taints;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
var(
...
nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
v1.NodeReady: {
v1.ConditionFalse: v1.TaintNodeNotReady,
v1.ConditionUnknown: v1.TaintNodeUnreachable,
},
v1.NodeMemoryPressure: {
v1.ConditionTrue: v1.TaintNodeMemoryPressure,
},
v1.NodeDiskPressure: {
v1.ConditionTrue: v1.TaintNodeDiskPressure,
},
v1.NodeNetworkUnavailable: {
v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
},
v1.NodePIDPressure: {
v1.ConditionTrue: v1.TaintNodePIDPressure,
},
}
...
)
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: v1.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == v1.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
3.2.2 nc.reconcileNodeLabels
nc.reconcileNodeLabels
方法主要是处理node对象中,os和arch相关的label,os和arch相关的label在kubelet向apiserver注册node的时候会带上;
主要逻辑:
(1)从informer缓存中获取node对象,不存在则直接return;
(2)如果node的label为空,则直接return;
(3)如果node对象存在key为“beta.kubernetes.io/os
”的label ,则设置key为“kubernetes.io/os
"、value值一样的label;
(4)如果node对象存在key为“beta.kubernetes.io/arch”的label,则设置key为“kubernetes.io/arch
"、value值一样的label;
3.3 nc.doPodProcessingWorker
nc.doPodProcessingWorker
方法判断node对象的status,当node的ready Condition
为false或unknown且已经持续了至少nc.podEvictionTimeout
的时间时,对该node上的pod做驱逐(删除)操作,并且如果node的ready condition
值不为true,则将pod的ready condition
更新为false;
需要注意的是,当启用了taint manager
时,pod的驱逐由taint manager
进行处理,这里就不再进行pod的驱逐处理。
主要逻辑:
(1)循环消费podUpdateQueue
队列,从队列中获取一个nodeName;
(2)调用nc.processPod
,做进一步处理;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doPodProcessingWorker() {
for {
obj, shutdown := nc.podUpdateQueue.Get()
// "podUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
podItem := obj.(podUpdateItem)
nc.processPod(podItem)
}
}
3.3.1 nc.processPod
nc.processPod
方法主要逻辑:
(1)从informer本地缓存中获取pod对象;
(2)获取pod所在nodeName,并根据nodeName调用nc.nodeHealthMap.getDeepCopy
获取nodeHealth
,如nodeHealth
为空则直接return;
(3)调用nodeutil.GetNodeCondition
,获取nodeHealth.status
中node的ready condition
,如果获取不到则直接return;
(4)判断taint manager是否启用,没启用则调用nc.processNoTaintBaseEviction
对pod做进一步处理(驱逐逻辑);
(5)如果node的ready condition
值不为true,则调用nodeutil.MarkPodsNotReady
将pod的ready condition
更新为false;
注意:当启用taint manager时,pod的驱逐由taint manager进行处理,所以不在这里处理。
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processPod(podItem podUpdateItem) {
defer nc.podUpdateQueue.Done(podItem)
pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
if err != nil {
if apierrors.IsNotFound(err) {
// If the pod was deleted, there is no need to requeue.
return
}
klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
nodeName := pod.Spec.NodeName
nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
if nodeHealth == nil {
// Node data is not gathered yet or node has beed removed in the meantime.
// Pod will be handled by doEvictionPass method.
return
}
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
klog.Warningf("Failed to read node %v: %v.", nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
_, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
if currentReadyCondition == nil {
// Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
// In both cases, the pod will be handled correctly (evicted if needed) during processing
// of the next node update event.
return
}
pods := []*v1.Pod{pod}
// In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
// Pods are processed by TaintManager.
if !nc.useTaintBasedEvictions {
if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
}
if currentReadyCondition.Status != v1.ConditionTrue {
if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
}
}
}
3.3.2 nc.processNoTaintBaseEviction
nc.processNoTaintBaseEviction
方法主要逻辑:
(1)当node的ready Condition
为false或unknown且已经持续了至少nc.podEvictionTimeout
的时间时,调用nc.evictPods
方法,将node加入到nc.zonePodEvictor
队列中,由其他worker消费该队列,对该node上的pod做驱逐(删除)操作;
(2)当node的ready Condition
为true时,调用nc.cancelPodEviction
方法,将该node从nc.zonePodEvictor
队列中移除,代表取消驱逐该node上的pod;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
decisionTimestamp := nc.now()
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
if nodeHealthData == nil {
return fmt.Errorf("health data doesn't exist for node %q", node.Name)
}
// Check eviction timeout against decisionTimestamp
switch observedReadyCondition.Status {
case v1.ConditionFalse:
if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout,
)
}
}
case v1.ConditionUnknown:
if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout-gracePeriod,
)
}
}
case v1.ConditionTrue:
if nc.cancelPodEviction(node) {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
return nil
}
3.3.3 nc.evictPods
nc.evictPods
方法不列出源码,只需要知道:该方法主要是将node加入到nc.zonePodEvictor
队列中,由其他worker消费该队列,对该node上的pod做驱逐(删除)操作;
3.3.4 消费nc.zonePodEvictor队列-nc.doEvictionPass
nc.doEvictionPass
方法负责消费nc.zonePodEvictor
队列,调用nodeutil.DeletePods
来将node上的pod驱逐(删除)掉;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
...
remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
...
})
}
}
// pkg/controller/util/node/controller_utils.go
func DeletePods(...) ... {
...
for i := range pods {
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
...
}
...
}
3.4 nc.doNoExecuteTaintingPass
nc.doNoExecuteTaintingPass
方法主要作用是根据node.Status.Conditions
的值来更新node.Spec.Taints
,主要是设置Effect
为noExecute
的taint;
主要逻辑为循环遍历nc.zoneNoExecuteTainter
,然后做相关处理:
(1)从nc.zoneNoExecuteTainter
中取出一个node,然后从informer本地缓存中获取该node对象;
(2)获取node对象ready condition
的值并做判断,如为false
则构建key为node.kubernetes.io/not-ready
,Effect为NoExecute
的taint;如为unknown
则构建key为node.kubernetes.io/unreachable
,Effect为NoExecute
的taint;
(3)最后调用nodeutil.SwapNodeControllerTaint
,将构造好的taint更新到node对象中(这里注意,上述两个NoExecute
的taint在node对象中,同一时间只会存在一个,一个添加到node对象中时,会把另一个移除);
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainter {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
}
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
switch condition.Status {
case v1.ConditionFalse:
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
case v1.ConditionUnknown:
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
default:
// It seems that the Node is ready again, so there's no need to taint it.
klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
if result {
//count the evictionsNumber
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return result, 0
})
}
}
3.5 nc.doEvictionPass
nc.doEvictionPass
方法主要作用是从nc.zonePodEvictor
中获取node,然后驱逐(删除)该node上除daemonset pod外的所有pod;
主要逻辑为循环遍历nc.zonePodEvictor
,然后做相关处理:
(1)从nc.zonePodEvictor
中取出一个node,然后从informer本地缓存中获取该node对象;
(2)调用nc.getPodsAssignedToNode
,获取该node上的所有pod;
(3)调用nodeutil.DeletePods
,删除该node上除daemonset pod外的所有的pod;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
}
nodeUID, _ := value.UID.(string)
pods, err := nc.getPodsAssignedToNode(value.Value)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
return false, 0
}
remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
// We are not setting eviction status here.
// New pods will be handled by zonePodEvictor retry
// instead of immediate pod eviction.
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if !nc.nodeEvictionMap.setStatus(value.Value, evicted) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", value.Value)
}
if remaining {
klog.Infof("Pods awaiting deletion due to Controller eviction")
}
if node != nil {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return true, 0
})
}
}
3.6 nc.monitorNodeHealth
nc.monitorNodeHealth
方法的主要作用是持续监控node的状态,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效),且当node心跳上报(node lease的更新时间)距离上一次上报时间已经超过nodeMonitorGracePeriod
(刚启动则为nodeStartupGracePeriod
),更新node对象的ready condition值,并做相应的驱逐处理:
(1)当开启了污点驱逐,且node的ready condition不为true时,添加NoExcute
污点,并将该node放入zoneNoExecuteTainter
中,由taintManager
来做驱逐操作;
(2)当没开启污点驱逐,且node的ready condition不为true持续了podEvictionTimeout
时间,则开始驱逐pod的操作;
主要逻辑:
(1)从informer本地缓存中获取所有node对象;
(2)调用nc.classifyNodes
,将这些node对象分为added、deleted、newZoneRepresentatives三类,即新增的、被删除的、属于新的zone三类,并对每一类做不同的逻辑处理(根据node对象的region与zone label,为每一个node划分一个zoneStates),zoneStates有Initial、Normal、FullDisruption、PartialDisruption四种,新增加的 node默认zoneStates为Initial,其余的几个zoneStates分别对应着不同的驱逐速率;
(3)遍历node对象列表,对每个node进行处理;
(3-1)调用nc.tryUpdateNodeHealth
,根据当前获取的node对象的ready condition值、node lease的更新时间等来更新nc.nodeHealthMap
中的数据、更新node的condition值,并获取该node的gracePeriod
、observedReadyCondition
、currentReadyCondition
值,observedReadyCondition
可以理解为该node上一次探测时的状态,currentReadyCondition
为本次计算出来的状态;
(3-2)如果currentReadyCondition
不为空,则调用nc.getPodsAssignedToNode
,获取该node上的所有pod列表;
(3-3)如果nc.useTaintBasedEvictions
为true,即开启了污点驱逐,则调用nc.processTaintBaseEviction
,当node的ready condition属性值为false时去除Unrearchable
的污点而添加Notready
的污点,并将该node加入zoneNoExecuteTainter
队列中;为unknown时去除Notready
的污点而添加Unrearchable
的污点,并将该node加入zoneNoExecuteTainter
队列中;为true时去除Notready
、Unrearchable
的污点,然后将该node从zoneNoExecuteTainter
队列中移除;
(3-4)如果nc.useTaintBasedEvictions
为false,则调用nc.processNoTaintBaseEviction
,做进一步的驱逐逻辑处理:当node的ready condition属性值为false时,判断距该node上一次的readyTransitionTimestamp
时间是否超过了 podEvictionTimeout
,是则将该node加入到zonePodEvictor
队列中,最终该node上的pod会被驱逐;当node的ready condition属性值为unknow时,判断距该node上一次的probeTimestamp
时间是否超过了 podEvictionTimeout
,是则将该node加入到zonePodEvictor
队列中,最终该node上的pod会被驱逐;当node的ready condition属性值为true时,调用nc.cancelPodEviction
,将该node从zonePodEvictor
队列中移除,代表不再对该node上的pod执行驱逐操作;
(3-5)当node对象的Ready Condition值由true变为false时,则调用nodeutil.MarkAllPodsNotReady
,将该node上的所有pod标记为notReady;
(4)调用nc.handleDisruption
,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效);
nc.handleDisruption
方法暂不展开分析,可自行查看;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) monitorNodeHealth() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
for i := range newZoneRepresentatives {
nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
}
for i := range added {
klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
nc.markNodeAsReachable(added[i])
} else {
nc.cancelPodEviction(added[i])
}
}
for i := range deleted {
klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := nodes[i].DeepCopy()
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
if err == nil {
return true, nil
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
return false, err
}
return false, nil
}); err != nil {
klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
}
// Some nodes may be excluded from disruption checking
if !isNodeExcludedFromDisruptionChecks(node) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
if currentReadyCondition != nil {
pods, err := nc.getPodsAssignedToNode(node.Name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
// If error happened during node status transition (Ready -> NotReady)
// we need to mark node for retry to force MarkPodsNotReady execution
// in the next iteration.
nc.nodesToRetry.Store(node.Name, struct{}{})
}
continue
}
if nc.useTaintBasedEvictions {
nc.processTaintBaseEviction(node, &observedReadyCondition)
} else {
if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict all pods from node %v: %v; queuing for retry", node.Name, err))
}
}
_, needsRetry := nc.nodesToRetry.Load(node.Name)
switch {
case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
// Report node event only once when status changed.
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
fallthrough
case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
nc.nodesToRetry.Store(node.Name, struct{}{})
continue
}
}
}
nc.nodesToRetry.Delete(node.Name)
}
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
3.6.1 nc.tryUpdateNodeHealth
nc.tryUpdateNodeHealth
方法主要是根据当前获取的node对象的ready condition值、node lease的更新时间等来更新nc.nodeHealthMap
中的数据、更新node的condition值,并返回gracePeriod
、上次记录的node的ready condition值observedReadyCondition
、本次计算出来的node的ready condition值currentReadyCondition
;
nc.tryUpdateNodeHealth
方法主要逻辑为:
(1)从node对象中获取ready condition的值,如果其为空,则设置observedReadyCondition
为unknown并设置gracePeriod
为nc.nodeStartupGracePeriod
;否则设置gracePeriod
值为nc.nodeMonitorGracePeriod
,设置observedReadyCondition
值为从node对象中获取到的ready condition的值;
(2)nodeHealth
的status
、probeTimestamp
、readyTransitionTimestamp
属性值赋值相关逻辑处理,status赋值为node.status,probeTimestamp赋值为现在的时间,当ready condition的LastTransitionTime
值有所变化,设置readyTransitionTimestamp
值为现在的时间;
(3)获取node对应的lease对象,判断其spec.RenewTime
属性值是否比上次记录的时间(nodeHealth.lease.spec.RenewTime
)要新,是则更新nodeHealth
的lease为新lease对象、更新probeTimestamp
为此时此刻的时间;
(4)判断现在距离上次探测时间probeTimestamp
是否已经超过了nc.nodeMonitorGracePeriod
时间,是则将该node的ready condition
、memoryPressure condition
、diskPressure condition
、pidPressure condition
的值都更新为unknown,最后判断上一次记录的node的ready condition的值与本次的是否一致,不一致则更新nodeHealth
的readyTransitionTimestamp
的时间为现在;
(5)最后返回gracePeriod
、上次记录的node的ready condition值observedReadyCondition
、本次计算出来的node的ready condition值currentReadyCondition
;
3.6.2 nc.processTaintBaseEviction
nc.processTaintBaseEviction
方法为污点驱逐的处理逻辑:
(1)当node的ready condition属性值为false时去除Unrearchable
的污点而添加Notready
的污点,并将该node加入zoneNoExecuteTainter
队列中;
(2)为unknown时去除Notready
的污点而添加Unrearchable
的污点,并将该node加入zoneNoExecuteTainter
队列中;
(3)为true时去除Notready
、Unrearchable
的污点,然后将该node从zoneNoExecuteTainter
队列中移除;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
decisionTimestamp := nc.now()
// Check eviction timeout against decisionTimestamp
switch observedReadyCondition.Status {
case v1.ConditionFalse:
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
klog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,
)
}
case v1.ConditionUnknown:
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,
)
}
case v1.ConditionTrue:
removed, err := nc.markNodeAsReachable(node)
if err != nil {
klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
if removed {
klog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
}
}
}
3.6.3 nc.processNoTaintBaseEviction
nc.processNoTaintBaseEviction
方法为未开启污点驱逐时的驱逐处理逻辑:
(1)当node的ready condition属性值为false时,判断距该node上一次的readyTransitionTimestamp
时间是否超过了 podEvictionTimeout
,是则将该node加入到zonePodEvictor
队列中,最终该node上的pod会被驱逐;
(2)当node的ready condition属性值为unknow时,判断距该node上一次的probeTimestamp
时间是否超过了 podEvictionTimeout
,是则将该node加入到zonePodEvictor
队列中,最终该node上的pod会被驱逐;
(3)当node的ready condition属性值为true时,调用nc.cancelPodEviction
,将该node从zonePodEvictor
队列中移除,代表不再对该node上的pod执行驱逐操作;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
decisionTimestamp := nc.now()
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
if nodeHealthData == nil {
return fmt.Errorf("health data doesn't exist for node %q", node.Name)
}
// Check eviction timeout against decisionTimestamp
switch observedReadyCondition.Status {
case v1.ConditionFalse:
if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout,
)
}
}
case v1.ConditionUnknown:
if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout-gracePeriod,
)
}
}
case v1.ConditionTrue:
if nc.cancelPodEviction(node) {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
return nil
}
总结
NodeLifecycleController
主要功能有:
(1)定期检查node的心跳上报,某个node间隔一定时间都没有心跳上报时,更新node的ready condition
值为false或unknown,开启了污点驱逐的情况下,给该node添加NoExecute
的污点;
(2)当污点驱逐未开启时,当node的ready Condition
值为false或unknown且已经持续了一段时间(该时间可配置)时,对该node上的pod做驱逐(删除)操作;
(3)当污点驱逐开启时,node上有NoExecute
污点后,立马驱逐(删除)不能容忍污点的pod,对于能容忍该污点的pod,则等待所有污点的容忍时间里最小值后,pod才被驱逐(删除);
驱逐中的zone概念
根据每个node对象的region和zone的label值,将node划分到不同的zone中;
region、zone值都相同的node,划分为同一个zone;
zone状态介绍
zone状态有四种,分别是:
(1)Initial
:初始化状态;
(2)FullDisruption
:ready的node数量为0,not ready的node数量大于0;
(3)PartialDisruption
:not ready的node数量大于2且其占比大于等于unhealthyZoneThreshold
;
(4)Normal
:上述三种状态以外的情形,都属于该状态;
需要注意二级驱逐速率对驱逐的影响,即kcm启动参数--secondary-node-eviction-rate
,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold
(默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold
,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate
个,默认为0.01;