kielchan / kubelet Goto Github PK
View Code? Open in Web Editor NEWlearn kubelet source code
learn kubelet source code
上次提到,创建pod需要事先了解 informer的运行机制,其实在此之外,需要额外了解apiserver的部分代码,否则kubelet的代码会变得异常难以理解。尤其是部分函数在不同pod的phase会在不同位置return,如果不能结合一个具体的pod,整篇代码会令人非常痛苦。
幸运的是,打通一切的过程并不复杂。
---
apiVersion: v1
kind: Pod
metadata:
name: hello-world-app
containers:
- name: stress
image: u-stress:0.1
I1207 11:44:07.921744 75035 reflector.go:398] kiel add obj: &Pod{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:hello-world-app,GenerateName:,Namespace:default,SelfLink:/api/v1/namespaces/default/pods/hello-world-app,UID:59bc88fd-f9d2-11e8-a6f0-567909393959,ResourceVersion:42127503,Generation:0,CreationTimestamp:2018-12-07 11:44:07 +0800 CST,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{},Annotations:map[string]string{kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"hello-world-app","namespace":"default"},"spec":{"containers":[{"image":"u-stress:0.1","name":"nginx"}]}},},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:PodSpec{Volumes:[{default-token-f7djb {nil nil nil nil nil SecretVolumeSource{SecretName:default-token-f7djb,Items:[],DefaultMode:*420,Optional:nil,} nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil}}],Containers:[{nginx u-stress:0.1 [] [] [] [] [] {map[] map[]} [{default-token-f7djb true /var/run/secrets/kubernetes.io/serviceaccount <nil>}] [] nil nil nil /dev/termination-log File IfNotPresent nil false false false}],RestartPolicy:Always,TerminationGracePeriodSeconds:*30,ActiveDeadlineSeconds:nil,DNSPolicy:ClusterFirst,NodeSelector:map[string]string{},ServiceAccountName:default,DeprecatedServiceAccount:default,NodeName:svr3438hw1288,HostNetwork:false,HostPID:false,HostIPC:false,SecurityContext:&PodSecurityContext{SELinuxOptions:nil,RunAsUser:nil,RunAsNonRoot:nil,SupplementalGroups:[],FSGroup:nil,},ImagePullSecrets:[],Hostname:,Subdomain:,Affinity:nil,SchedulerName:default-scheduler,InitContainers:[],AutomountServiceAccountToken:nil,Tolerations:[{node.kubernetes.io/not-ready Exists NoExecute 0xc001bcd850} {node.kubernetes.io/unreachable Exists NoExecute 0xc001bcdaa0}],HostAliases:[],PriorityClassName:,Priority:nil,DNSConfig:nil,},Status:PodStatus{Phase:Pending,Conditions:[{PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-07 11:44:07 +0800 CST }],Message:,Reason:,HostIP:,PodIP:,StartTime:<nil>,ContainerStatuses:[],QOSClass:BestEffort,InitContainerStatuses:[],},}
I1207 11:44:07.922373 75035 kubelet.go:1857] SyncLoop (ADD, "api"): "hello-world-app_default(59bc88fd-f9d2-11e8-a6f0-567909393959)"
当然首先展示相关日志:
I1212 16:44:16.766293 75717 kubelet.go:1857] SyncLoop (ADD, "api"): "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:16.766470 75717 status_manager.go:367] Status Manager: adding pod: "1be94121-fdea-11e8-99b5-567909393919", with status: ('\x01', {Pending [{Initialized True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST } {Ready False 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST ContainersNotReady containers with unready status: [nginx]} {PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST }] 10.21.208.99 2018-12-12 16:44:16 +0800 CST [] [{nginx {&ContainerStateWaiting{Reason:ContainerCreating,Message:,} nil nil} {nil nil nil} false 0 u-stress:0.1 }] BestEffort}) to podStatusChannel
I1212 16:44:16.766573 75717 status_manager.go:146] Status Manager: syncing pod: "1be94121-fdea-11e8-99b5-567909393919", with status: (1, {Pending [{Initialized True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST } {Ready False 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST ContainersNotReady containers with unready status: [nginx]} {PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST }] 10.21.208.99 2018-12-12 16:44:16 +0800 CST [] [{nginx {&ContainerStateWaiting{Reason:ContainerCreating,Message:,} nil nil} {nil nil nil} false 0 u-stress:0.1 }] BestEffort}) from podStatusChannel
// 从updates channel 投递给kubelet的HandlePodAdditions函数
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// omitted ...
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
}
// omitted ...
}
// omitted ...
return true
}
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
// 可能有一批pod在相近时间创建,但此处需要按照创建时间进行排序
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
// 此处有上限限制,一般溢出上限会打出warning
// Responsible for checking limits in resolv.conf
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
// 对于新启动的kubelet而言,podManager里面什么都没有
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
// 一般情况下,非静态的pod不会走此分支
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// 对于新创建的pod而言,一定走进分支,不排除其他情况会忽略此分支
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
// 由于资源约束会导致reject,一般由修改eviction参数后重启kubelet导致
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
// 正式处理pod
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
//此分支因为函数入口多样而为必须检验条件,对于一个新创建的pod而言,此处分支会被忽略
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
// podWorker和podManager其实没有从属关系,podWorker主要和kubelet其他组件交互,一般交互的对象包括statusManager,kube-runtime(docker),还包括交互成功或失败后的重试队列,避免pod处理流程被卡死,podWorker和pod的生命周期一致,一旦pod消亡,podWorker也会被remove掉
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// 对比podCache和apiserver接收过来的pod,来决定是否将podCache的status同步给apiserver
// 对于一个新创建的pod而言,podCache内无法找到此pod,则status为空值,
// 它的默认值一般为:status: &PodStatus{ID: id}
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
// 获取最新的正确的status,它取决于PLEG更新podCache的时间
// 一般情况(指PLEG正常运行)在创建pod的阶段,此处的status只返回默认值
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
// 此处syncFunc指向的是kubelet的syncPod
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
}
// podWorker的重试机制,即使是正确sync过的pod也会被丢进队列重试,失败的则
// 延迟10s丢进队列
p.wrapUp(update.Pod.UID, err)
}
}
I1212 16:44:16.781736 75717 volume_manager.go:343] Waiting for volumes to attach and mount for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:16.909704 75717 reconciler.go:217] operationExecutor.VerifyControllerAttachedVolume started for volume "default-token-f7djb" (UniqueName: "kubernetes.io/secret/1be94121-fdea-11e8-99b5-567909393919-default-token-f7djb") pod "hello-world-app" (UID: "1be94121-fdea-11e8-99b5-567909393919")
I1212 16:44:17.009958 75717 reconciler.go:262] operationExecutor.MountVolume started for volume "default-token-f7djb" (UniqueName: "kubernetes.io/secret/1be94121-fdea-11e8-99b5-567909393919-default-token-f7djb") pod "hello-world-app" (UID: "1be94121-fdea-11e8-99b5-567909393919")
I1212 16:44:17.009999 75717 secret.go:186] Setting up volume default-token-f7djb for pod 1be94121-fdea-11e8-99b5-567909393919 at /var/lib/k8s/kubelet/pods/1be94121-fdea-11e8-99b5-567909393919/volumes/kubernetes.io~secret/default-token-f7djb
I1212 16:44:17.081922 75717 volume_manager.go:372] All volumes are attached and mounted for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:17.082113 75717 kuberuntime_manager.go:385] No sandbox for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" can be found. Need to start a new one
I1212 16:44:17.082124 75717 kuberuntime_manager.go:571] computePodActions got {KillPod:true CreateSandbox:true SandboxID: Attempt:0 NextInitContainerToStart:nil ContainersToStart:[0] ContainersToKill:map[]} for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:17.082157 75717 kuberuntime_manager.go:580] SyncPod received new pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)", will create a sandbox for it
I1212 16:44:17.082164 75717 kuberuntime_manager.go:589] Stopping PodSandbox for "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)", will start new one
I1212 16:44:17.082182 75717 kuberuntime_manager.go:641] Creating sandbox for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:17.106342 75717 docker_service.go:441] Setting cgroup parent to: "/kubepods/besteffort/pod1be94121-fdea-11e8-99b5-567909393919"
I1212 16:44:17.498006 75717 docker_sandbox.go:658] Will attempt to re-write config file /var/lib/docker/containers/a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657/resolv.conf with:
[nameserver 10.96.0.10 search default.com options ndots:5]
I1212 16:44:17.498134 75717 plugins.go:412] Calling network plugin cni to set up pod "hello-world-app_default"
I1212 16:44:17.498878 75717 manager.go:970] Added container: "/kubepods/besteffort/pod1be94121-fdea-11e8-99b5-567909393919/a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657" (aliases: [k8s_POD_hello-world-app_default_1be94121-fdea-11e8-99b5-567909393919_0 a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657], namespace: "docker")
I1212 16:44:17.498913 75717 cni.go:284] Got netns path /proc/75921/ns/net
I1212 16:44:17.498919 75717 cni.go:285] Using podns path default
I1212 16:44:17.499008 75717 cni.go:256] About to add CNI network cni-loopback (type=loopback)
I1212 16:44:21.804415 75717 status_manager.go:484] Status for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" updated successfully: (1, {Phase:Pending Conditions:[{Type:Initialized Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason: Message:} {Type:Ready Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason:ContainersNotReady Message:containers with unready status: [nginx]} {Type:PodScheduled Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason: Message:}] Message: Reason: HostIP:10.21.208.99 PodIP: StartTime:2018-12-12 16:44:16 +0800 CST InitContainerStatuses:[] ContainerStatuses:[{Name:nginx State:{Waiting:&ContainerStateWaiting{Reason:ContainerCreating,Message:,} Running:nil Terminated:nil} LastTerminationState:{Waiting:nil Running:nil Terminated:nil} Ready:false RestartCount:0 Image:u-stress:0.1 ImageID: ContainerID:}] QOSClass:BestEffort})
I1212 16:44:21.970512 75717 kuberuntime_manager.go:655] Created PodSandbox "a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657" for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:21.974708 75717 kuberuntime_manager.go:674] Determined the ip "10.5.150.103" for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" after sandbox changed
I1212 16:44:21.974790 75717 kuberuntime_manager.go:725] Creating container &Container{Name:nginx,Image:u-stress:0.1,Command:[],Args:[],WorkingDir:,Ports:[],Env:[],Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[{default-token-f7djb true /var/run/secrets/kubernetes.io/serviceaccount <nil>}],LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:IfNotPresent,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[],TerminationMessagePolicy:File,VolumeDevices:[],} in pod hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// omitted ...
// Generate final API pod status with pod and status manager status
// 这个非常重要,最终会上报给statusManager和apiserver同步,一般来说,初创建的pod的
// phase的值为pending
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IP = apiPodStatus.PodIP
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; update the Pod and Container statuses to why.
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
// 设置status manager内的pod status,准备向apiserver同步,
// 对于一个新创建的pod而言,此处的podstatus 只包含一些image信息
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// omitted ...
// 对于一个新创建的pod而言,在进入创建容器之前需要做一些准备工作
// 1. 网络插件是否完备
// 2. dir
// 3. volume
// 4. image secret
// If the network plugin is not ready, only start the pod if it uses the host network
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)
}
// omitted ...
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
}
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用runtime(docker)创建pod,包括创建sandbox,创建用户所需的容器等等
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
}
}
return nil
}
return nil
}
generateAPIPodStatus
, kl.volumeManager.WaitForAttachAndMount
,kl.containerRuntime.SyncPod
, 这是创建pod过程中三个特别重要的过程,该过程一方面描述了创建pod的必经之路,另一方面,还包括pod status在kubelet内部的状态演进。接下来按照此顺序深入描述这三点:func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
klog.V(3).Infof("Generating status for %q", format.Pod(pod))
// 设定api pod status内的pod IP,qosClass,containerStatus和initContainerStatus,podCondition
// 其中containerStatus包含了一些容器的基本信息,容器id,启动停止时间等
s := kl.convertStatusToAPIStatus(pod, podStatus)
// omitted ...
// Assume info is ready to process
spec := &pod.Spec
allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
// phase是kubelet对pod的状态机的划分,一般包括
// PodPending, PodRunning, PodSucceeded, PodFailed
// 由于phase是根据container的status来划分的,container是status又来源于statusManager具有一定的延迟性
// 所以对于一个新创建的pod(指没有通过runtime去创建容器),它的phase是pending
s.Phase = getPhase(spec, allStatus)
// omitted ...
// 设置readiness和liveness的状态
kl.probeManager.UpdatePodStatus(pod.UID, s)
// 设置pod的condition
s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.Conditions, s.ContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GenerateContainersReadyCondition(spec, s.ContainerStatuses, s.Phase))
// Status manager will take care of the LastTransitionTimestamp, either preserve
// the timestamp from apiserver, or set a new one. When kubelet sees the pod,
// `PodScheduled` condition must be true.
// schedule的是因为pod被bind到某一节点,当此节点上的kubelet watch到此pod的时候已经完成了调度工作
s.Conditions = append(s.Conditions, v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
})
if kl.kubeClient != nil {
hostIP, err := kl.getHostIPAnyWay()
if err != nil {
klog.V(4).Infof("Cannot get host IP: %v", err)
} else {
s.HostIP = hostIP.String()
if kubecontainer.IsHostNetworkPod(pod) && s.PodIP == "" {
s.PodIP = hostIP.String()
}
}
}
return *s
}
// volumeManager是一个逻辑上不复杂但组成构件特别复杂的插件,它主要负责pod的volume的创建和销毁工作,通常,它需要和podManager和statusManager协同工作来分别提取pod spec内基本信息以及pod的状态信息,从而决定对pod的volume做相应的操作
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
if pod == nil {
return nil
}
// 从pod的spec里面解析出所需的volume
expectedVolumes := getExpectedVolumes(pod)
if len(expectedVolumes) == 0 {
// No volumes to verify
return nil
}
klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
// 获取pod的uid
uniquePodName := util.GetUniquePodName(pod)
// Some pods expect to have Setup called over and over again to update.
// Remount plugins for which this is true. (Atomically updating volumes,
// like Downward API, depend on this to update the contents of the volume).
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
// 对比已经挂载好的volume和expect的volume,在2分3秒的时间范围内,如果此volume挂载完毕则
// 正常退出
err := wait.PollImmediate(
podAttachAndMountRetryInterval,
podAttachAndMountTimeout,
vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
if err != nil {
// Timeout expired
unmountedVolumes :=
vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
// Also get unattached volumes for error message
unattachedVolumes :=
vm.getUnattachedVolumes(expectedVolumes)
if len(unmountedVolumes) == 0 {
return nil
}
return fmt.Errorf(
"timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v",
pod.Namespace,
pod.Name,
unmountedVolumes,
unattachedVolumes)
}
klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
return nil
}
// 接下来则是创建pod的重中之重,调用runtime创建sandbox和container
// 对于一个创建过程的pod而言,直接创建就完事了,当然了还是需要关注computePodActions
// computePodActions一般针对那些上一次pod创建失败的pod所做的清理工作
// 清理工作可能包含重新创建sandbox,清理脏的container然后重建等等
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
// omitted ...
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
// 调用rpc方法创建sandbox
// 在调用docker创建sandbox的时候一般具有以下流程
// 1. 生成podSandboxConfig
// 2. rpc调用根据config创建sandbox
// 3. rpc服务端接收到请求转发给runtime
// 4. runtime解析config
// 5. runtime 拉镜像
// 6. 生成sandbox container的config,打上podsandbox的label等
// 7. 调用docker创建sandbox container
// 8. 设置sandbox的网络为ready为false以及创建sandbox的check point
// 9. 启动sandbox容器
// 10. 覆写resolv.conf
// 11. 调用cni插件创建sandbox的网络
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
return
}
klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
result.Fail(err)
return
}
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
klog.Error(message)
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}
// 拿到sandbox的config,可以据此创建initContainer和container
// 创建container的步骤包括:
// 1. 下载镜像
// 2. 生成container config 并创建container
// 3. 启动container
// 4. 执行post-start container
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
return
}
klog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
startContainerResult.Fail(err, msg)
utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
klog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).Infof("container start failed: %v: %s", err, msg)
default:
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
}
continue
}
}
return
}
我们首先要了解一些内置的功能才能了解kubelet是如何通过watch来trigger完成pod的生命周期的轮转,首先很重要的一点就是list and watch功能。
该功能会从特定的来源查看pod的基本信息和变化信息,根据这些信息和本地cache进行对比,从而促使kubelet做出ADD,UPDATE,REMOVE动作。
它首先要定义pod的来源,目前有三个来源: StaticPodPath, StaticPodURL, 以及apiserver,这里我只关心最一般的情况,pod 来源于apiserver
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// omitted...
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
接下来可以此处是一个桥接的过程,通过list and watch函数从apiserver取出信息,然后先将数据存入本地cache,一旦本地cache发生变化,则将本地cache通过source channel 推给预处理的逻辑部分。
PodConfig.updates
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
// 实例化channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
// 添加一个新的source,并对该channel保持监听并进行处理再重新分发
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
// 首先是实例化list and watch函数,它通过指定类型来实例化具体的函数
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// 此处已经确定资源类型,应该是pod资源
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// 注意send 函数,此处通过它将list and watch的信息透传出来,该updates就是1内的管道
// 在初始化完成后,则开始运行
// 另外注意此处塞入管道的数据类型PodUpdate, op为 kubetypes.SET
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: "0"}
r.metrics.numberOfLists.Inc()
start := r.clock.Now()
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
// 通过键值对的方式,将items存入本地缓存,方法为Replace,在replace之后,会将cache内的list内容全量推给 source channel
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// 虽然写了resync的逻辑,但是kubelet使用的cache并不具备resync的功能
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// omitted ...
}
// 调用replace后会将本地缓存内的所有数据全量推给source channel
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
if err := u.Store.Replace(list, resourceVersion); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
// omitted ...
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
}
r.metrics.numberOfWatches.Inc()
// 根据上下文,此处watch的是pod的变动
w, err := r.listerWatcher.Watch(options)
// omitted ...
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
}
// 根据watch到的数据类型进行分发
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
// omitted
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
// 根据watch到的事件类型在cache内做相应的操作
// 如果是新增,则在cache内添加新的键值对,还包含一些index的新增,此处不赘述
// 如果是更新,则在cache内更改键值对,修改index
// 如果是删除,则在cache内删除键值对,删除index
// 无论是新增更新或者删除,在cache内做完相应的操作后,都会将cache内的所有信息推送到source channel内
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
r.metrics.numberOfShortWatches.Inc()
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
}
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
// 在此处进行判断,对于cache推送过来的pod作相应的处理
// 应该认识到,cache内的信息已经完成了变更
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
// 根据类型将信息分发给podConfig的updates channel,该channel会吐出信息让kubelet分别处理
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(restores.Pods) > 0 {
s.updates <- *restores
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
// omitted ...
return nil
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
restorePods := []*v1.Pod{}
// 此处的pod存在pod的source内,和cache内的有一定区别,cache内的pod均已完成更新
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
// pod source内的pod信息会在此之后完成更新,更新的内容包括firstseentime,source,在source内
// 保存的形式为键值对
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
// 没有在source里面找到的都是新增的pod,需要创建
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
// omitted ...
// 只关心此处是因为,从list watch发过来的podConfig只是 SET 类型,没有其他任何类型
case kubetypes.SET:
klog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
// source里面有一个pod是cache里面没有的,则需要删掉它
removePods = append(removePods, existing)
}
}
// omitted ...
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
return adds, updates, deletes, removes, reconciles, restores
}
// 只有状态变化的为reconcile
// DeletionTimestamp不为空的 为 DELETE
// 除了status之外字段不一致且DeletionTimestamp为空的为update
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
// this is not an update
// Only check reconcile when it is not an update, because if the pod is going to
// be updated, an extra reconcile is unnecessary
if !reflect.DeepEqual(existing.Status, ref.Status) {
// Pod with changed pod status needs reconcile, because kubelet should
// be the source of truth of pod status.
existing.Status = ref.Status
needReconcile = true
}
return
}
// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
existing.Spec = ref.Spec
existing.Labels = ref.Labels
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Status = ref.Status
updateAnnotations(existing, ref)
// 2. this is an graceful delete
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}
return
}
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
// omitted ...
}
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// omitted ...
kl.syncLoop(updates, kl)
}
// 这样子从apiserver看到的包括list和watch到的信息,全部都分发到这里由kubelet分别处理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
if u.Op != kubetypes.RESTORE {
// If the update type is RESTORE, it means that the update is from
// the pod checkpoints and may be incomplete. Do not mark the
// source as ready.
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
}
}
return true
}
[x] list 和 watch 负责从apisever拉取pod信息,更新本地的cache,并全量推cache至source channel
[x] pod source 负责对比 pod source内的pod 和 cache之间的差别,决定对pod做相应处理并投递至 update channel
[x] kubelet 负责从updates channel里面读取不同类别的信息,进行相应的处理
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.