Code Monkey home page Code Monkey logo

kubelet's People

Watchers

 avatar  avatar

kubelet's Issues

pod lifecycle 之 create

前言

上次提到,创建pod需要事先了解 informer的运行机制,其实在此之外,需要额外了解apiserver的部分代码,否则kubelet的代码会变得异常难以理解。尤其是部分函数在不同pod的phase会在不同位置return,如果不能结合一个具体的pod,整篇代码会令人非常痛苦。
幸运的是,打通一切的过程并不复杂。

一点点准备

  1. 手动创建一个pod
  2. 在kubelet 和 apiserver内添加一些日志打印,并重新编译
  3. 如果需要可以在kubelet sync status的时候加一些时延

创建pod的yaml:

---
apiVersion: v1
kind: Pod
metadata:
  name: hello-world-app
  containers:
  - name: stress
    image: u-stress:0.1

在以下几个地方打日志:

  1. client-go watch
  2. kubelet status manager syncPod

pod的创建流程detail

  1. kubelet watch 到 有新增的pod出现,informer将其推送到 podConfig updates channel,由kubelet处理
I1207 11:44:07.921744   75035 reflector.go:398] kiel add obj: &Pod{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:hello-world-app,GenerateName:,Namespace:default,SelfLink:/api/v1/namespaces/default/pods/hello-world-app,UID:59bc88fd-f9d2-11e8-a6f0-567909393959,ResourceVersion:42127503,Generation:0,CreationTimestamp:2018-12-07 11:44:07 +0800 CST,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{},Annotations:map[string]string{kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"hello-world-app","namespace":"default"},"spec":{"containers":[{"image":"u-stress:0.1","name":"nginx"}]}},},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:PodSpec{Volumes:[{default-token-f7djb {nil nil nil nil nil SecretVolumeSource{SecretName:default-token-f7djb,Items:[],DefaultMode:*420,Optional:nil,} nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil}}],Containers:[{nginx u-stress:0.1 [] []  [] [] [] {map[] map[]} [{default-token-f7djb true /var/run/secrets/kubernetes.io/serviceaccount  <nil>}] [] nil nil nil /dev/termination-log File IfNotPresent nil false false false}],RestartPolicy:Always,TerminationGracePeriodSeconds:*30,ActiveDeadlineSeconds:nil,DNSPolicy:ClusterFirst,NodeSelector:map[string]string{},ServiceAccountName:default,DeprecatedServiceAccount:default,NodeName:svr3438hw1288,HostNetwork:false,HostPID:false,HostIPC:false,SecurityContext:&PodSecurityContext{SELinuxOptions:nil,RunAsUser:nil,RunAsNonRoot:nil,SupplementalGroups:[],FSGroup:nil,},ImagePullSecrets:[],Hostname:,Subdomain:,Affinity:nil,SchedulerName:default-scheduler,InitContainers:[],AutomountServiceAccountToken:nil,Tolerations:[{node.kubernetes.io/not-ready Exists  NoExecute 0xc001bcd850} {node.kubernetes.io/unreachable Exists  NoExecute 0xc001bcdaa0}],HostAliases:[],PriorityClassName:,Priority:nil,DNSConfig:nil,},Status:PodStatus{Phase:Pending,Conditions:[{PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-07 11:44:07 +0800 CST  }],Message:,Reason:,HostIP:,PodIP:,StartTime:<nil>,ContainerStatuses:[],QOSClass:BestEffort,InitContainerStatuses:[],},}

I1207 11:44:07.922373   75035 kubelet.go:1857] SyncLoop (ADD, "api"): "hello-world-app_default(59bc88fd-f9d2-11e8-a6f0-567909393959)"
  1. kubelet处理pod的add流程代码,有podManager 和 proberManager,podWorker负责添加pod进行相应的处理,podManager一般存放的是secret,configMap,checkPoint, mirror等等(还包括和volumeManager交互用来创建pod的volume),proberManager则负责readiness和liveness探针以及重启策略相关的功能,podWorker主要负责对比apiserver接收过来pod和本地的podCache进行对比,从而调用kubelet的syncPod进行处理:

当然首先展示相关日志:

I1212 16:44:16.766293   75717 kubelet.go:1857] SyncLoop (ADD, "api"): "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"

I1212 16:44:16.766470   75717 status_manager.go:367] Status Manager: adding pod: "1be94121-fdea-11e8-99b5-567909393919", with status: ('\x01', {Pending [{Initialized True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST  } {Ready False 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST ContainersNotReady containers with unready status: [nginx]} {PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST  }]   10.21.208.99  2018-12-12 16:44:16 +0800 CST [] [{nginx {&ContainerStateWaiting{Reason:ContainerCreating,Message:,} nil nil} {nil nil nil} false 0 u-stress:0.1  }] BestEffort}) to podStatusChannel
I1212 16:44:16.766573   75717 status_manager.go:146] Status Manager: syncing pod: "1be94121-fdea-11e8-99b5-567909393919", with status: (1, {Pending [{Initialized True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST  } {Ready False 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST ContainersNotReady containers with unready status: [nginx]} {PodScheduled True 0001-01-01 00:00:00 +0000 UTC 2018-12-12 16:44:16 +0800 CST  }]   10.21.208.99  2018-12-12 16:44:16 +0800 CST [] [{nginx {&ContainerStateWaiting{Reason:ContainerCreating,Message:,} nil nil} {nil nil nil} false 0 u-stress:0.1  }] BestEffort}) from podStatusChannel
// 从updates channel 投递给kubelet的HandlePodAdditions函数
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
// omitted ...
		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		
		}
// omitted ...
	}
// omitted ...
	return true
}

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
        // 可能有一批pod在相近时间创建,但此处需要按照创建时间进行排序
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
                // 此处有上限限制,一般溢出上限会打出warning
		// Responsible for checking limits in resolv.conf
		if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
			kl.dnsConfigurer.CheckLimitsForResolvConf()
		}
                // 对于新启动的kubelet而言,podManager里面什么都没有
		existingPods := kl.podManager.GetPods()
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)
                // 一般情况下,非静态的pod不会走此分支
		if kubepod.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
                // 对于新创建的pod而言,一定走进分支,不排除其他情况会忽略此分支
		if !kl.podIsTerminated(pod) {
			// Only go through the admission process if the pod is not
			// terminated.

			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutTerminatedPods(existingPods)

			// Check if we can admit the pod; if not, reject it.
                         // 由于资源约束会导致reject,一般由修改eviction参数后重启kubelet导致
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
                // 正式处理pod
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
		kl.probeManager.AddPod(pod)
	}
}


func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
         //此分支因为函数入口多样而为必须检验条件,对于一个新创建的pod而言,此处分支会被忽略
	if kl.podIsTerminated(pod) {
		if pod.DeletionTimestamp != nil {
			// If the pod is in a terminated state, there is no pod worker to
			// handle the work item. Check if the DeletionTimestamp has been
			// set, and force a status update to trigger a pod deletion request
			// to the apiserver.
			kl.statusManager.TerminatePod(pod)
		}
		return
	}
	// Run the sync in an async worker.
        // podWorker和podManager其实没有从属关系,podWorker主要和kubelet其他组件交互,一般交互的对象包括statusManager,kube-runtime(docker),还包括交互成功或失败后的重试队列,避免pod处理流程被卡死,podWorker和pod的生命周期一致,一旦pod消亡,podWorker也会被remove掉
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			if err != nil {
				metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
			}
		},
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}
// 对比podCache和apiserver接收过来的pod,来决定是否将podCache的status同步给apiserver
// 对于一个新创建的pod而言,podCache内无法找到此pod,则status为空值,
// 它的默认值一般为:status: &PodStatus{ID: id}
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	var lastSyncTime time.Time
	for update := range podUpdates {
		err := func() error {
			podUID := update.Pod.UID
			// This is a blocking call that would return only if the cache
			// has an entry for the pod that is newer than minRuntimeCache
			// Time. This ensures the worker doesn't start syncing until
			// after the cache is at least newer than the finished time of
			// the previous sync.
                         // 获取最新的正确的status,它取决于PLEG更新podCache的时间
                         // 一般情况(指PLEG正常运行)在创建pod的阶段,此处的status只返回默认值
			status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
			if err != nil {
				// This is the legacy event thrown by manage pod loop
				// all other events are now dispatched from syncPodFn
				p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
				return err
			}
                        // 此处syncFunc指向的是kubelet的syncPod
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
			lastSyncTime = time.Now()
			return err
		}()
		// notify the call-back function if the operation succeeded or not
		if update.OnCompleteFunc != nil {
			update.OnCompleteFunc(err)
		}
		if err != nil {
			// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
			klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
		}
                // podWorker的重试机制,即使是正确sync过的pod也会被丢进队列重试,失败的则
               // 延迟10s丢进队列
		p.wrapUp(update.Pod.UID, err)
	}
}
  1. kubelet的syncPod的流程,在创建阶段,它主要和statusManager交互,负责向apiserver推送pod的status,另外还需要和runtime交互,负责创建sandbox,以及container:
    相关日志:
I1212 16:44:16.781736   75717 volume_manager.go:343] Waiting for volumes to attach and mount for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"

I1212 16:44:16.909704   75717 reconciler.go:217] operationExecutor.VerifyControllerAttachedVolume started for volume "default-token-f7djb" (UniqueName: "kubernetes.io/secret/1be94121-fdea-11e8-99b5-567909393919-default-token-f7djb") pod "hello-world-app" (UID: "1be94121-fdea-11e8-99b5-567909393919")

I1212 16:44:17.009958   75717 reconciler.go:262] operationExecutor.MountVolume started for volume "default-token-f7djb" (UniqueName: "kubernetes.io/secret/1be94121-fdea-11e8-99b5-567909393919-default-token-f7djb") pod "hello-world-app" (UID: "1be94121-fdea-11e8-99b5-567909393919")
I1212 16:44:17.009999   75717 secret.go:186] Setting up volume default-token-f7djb for pod 1be94121-fdea-11e8-99b5-567909393919 at /var/lib/k8s/kubelet/pods/1be94121-fdea-11e8-99b5-567909393919/volumes/kubernetes.io~secret/default-token-f7djb

I1212 16:44:17.081922   75717 volume_manager.go:372] All volumes are attached and mounted for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"


I1212 16:44:17.082113   75717 kuberuntime_manager.go:385] No sandbox for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" can be found. Need to start a new one
I1212 16:44:17.082124   75717 kuberuntime_manager.go:571] computePodActions got {KillPod:true CreateSandbox:true SandboxID: Attempt:0 NextInitContainerToStart:nil ContainersToStart:[0] ContainersToKill:map[]} for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:17.082157   75717 kuberuntime_manager.go:580] SyncPod received new pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)", will create a sandbox for it
I1212 16:44:17.082164   75717 kuberuntime_manager.go:589] Stopping PodSandbox for "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)", will start new one
I1212 16:44:17.082182   75717 kuberuntime_manager.go:641] Creating sandbox for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:17.106342   75717 docker_service.go:441] Setting cgroup parent to: "/kubepods/besteffort/pod1be94121-fdea-11e8-99b5-567909393919"

I1212 16:44:17.498006   75717 docker_sandbox.go:658] Will attempt to re-write config file /var/lib/docker/containers/a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657/resolv.conf with:
[nameserver 10.96.0.10 search default.com  options ndots:5]
I1212 16:44:17.498134   75717 plugins.go:412] Calling network plugin cni to set up pod "hello-world-app_default"
I1212 16:44:17.498878   75717 manager.go:970] Added container: "/kubepods/besteffort/pod1be94121-fdea-11e8-99b5-567909393919/a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657" (aliases: [k8s_POD_hello-world-app_default_1be94121-fdea-11e8-99b5-567909393919_0 a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657], namespace: "docker")
I1212 16:44:17.498913   75717 cni.go:284] Got netns path /proc/75921/ns/net
I1212 16:44:17.498919   75717 cni.go:285] Using podns path default
I1212 16:44:17.499008   75717 cni.go:256] About to add CNI network cni-loopback (type=loopback)

I1212 16:44:21.804415   75717 status_manager.go:484] Status for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" updated successfully: (1, {Phase:Pending Conditions:[{Type:Initialized Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason: Message:} {Type:Ready Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason:ContainersNotReady Message:containers with unready status: [nginx]} {Type:PodScheduled Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2018-12-12 16:44:16 +0800 CST Reason: Message:}] Message: Reason: HostIP:10.21.208.99 PodIP: StartTime:2018-12-12 16:44:16 +0800 CST InitContainerStatuses:[] ContainerStatuses:[{Name:nginx State:{Waiting:&ContainerStateWaiting{Reason:ContainerCreating,Message:,} Running:nil Terminated:nil} LastTerminationState:{Waiting:nil Running:nil Terminated:nil} Ready:false RestartCount:0 Image:u-stress:0.1 ImageID: ContainerID:}] QOSClass:BestEffort})
I1212 16:44:21.970512   75717 kuberuntime_manager.go:655] Created PodSandbox "a1eb9fb171d9fa3fdf621d318f2803084e170e9da2a95a0f391ee56a7eda9657" for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)"
I1212 16:44:21.974708   75717 kuberuntime_manager.go:674] Determined the ip "10.5.150.103" for pod "hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)" after sandbox changed
I1212 16:44:21.974790   75717 kuberuntime_manager.go:725] Creating container &Container{Name:nginx,Image:u-stress:0.1,Command:[],Args:[],WorkingDir:,Ports:[],Env:[],Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[{default-token-f7djb true /var/run/secrets/kubernetes.io/serviceaccount  <nil>}],LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:IfNotPresent,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[],TerminationMessagePolicy:File,VolumeDevices:[],} in pod hello-world-app_default(1be94121-fdea-11e8-99b5-567909393919)
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	// pull out the required options
	pod := o.pod
	mirrorPod := o.mirrorPod
	podStatus := o.podStatus
	updateType := o.updateType

// omitted ...

	// Generate final API pod status with pod and status manager status
        // 这个非常重要,最终会上报给statusManager和apiserver同步,一般来说,初创建的pod的
        // phase的值为pending
	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
	// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
	// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
	// set pod IP to hostIP directly in runtime.GetPodStatus
	podStatus.IP = apiPodStatus.PodIP

	// Record the time it takes for the pod to become running.
	existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
	if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
		!firstSeenTime.IsZero() {
		metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
	}

	runnable := kl.canRunPod(pod)
	if !runnable.Admit {
		// Pod is not runnable; update the Pod and Container statuses to why.
		apiPodStatus.Reason = runnable.Reason
		apiPodStatus.Message = runnable.Message
		// Waiting containers are not creating.
		const waitingReason = "Blocked"
		for _, cs := range apiPodStatus.InitContainerStatuses {
			if cs.State.Waiting != nil {
				cs.State.Waiting.Reason = waitingReason
			}
		}
		for _, cs := range apiPodStatus.ContainerStatuses {
			if cs.State.Waiting != nil {
				cs.State.Waiting.Reason = waitingReason
			}
		}
	}

        // 设置status manager内的pod status,准备向apiserver同步,
        // 对于一个新创建的pod而言,此处的podstatus 只包含一些image信息
	// Update status in the status manager
	kl.statusManager.SetPodStatus(pod, apiPodStatus)

        // omitted ...

        // 对于一个新创建的pod而言,在进入创建容器之前需要做一些准备工作
        // 1. 网络插件是否完备
        // 2. dir
        // 3. volume
        // 4. image secret

	// If the network plugin is not ready, only start the pod if it uses the host network
	if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)
		return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)
	}

        // omitted ...

	// Make data directories for the pod
	if err := kl.makePodDataDirs(pod); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
		klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
		return err
	}

	// Volume manager will not mount volumes for terminated pods
	if !kl.podIsTerminated(pod) {
		// Wait for volumes to attach/mount
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
			klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
			return err
		}
	}

	// Fetch the pull secrets for the pod
	pullSecrets := kl.getPullSecretsForPod(pod)


        // 调用runtime(docker)创建pod,包括创建sandbox,创建用户所需的容器等等
	// Call the container runtime's SyncPod callback
	result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
	kl.reasonCache.Update(pod.UID, result)
	if err := result.Error(); err != nil {
		// Do not return error if the only failures were pods in backoff
		for _, r := range result.SyncResults {
			if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
				// Do not record an event here, as we keep all event logging for sync pod failures
				// local to container runtime so we get better errors
				return err
			}
		}

		return nil
	}

	return nil
}
  1. 在第三步中留了不少坑,包括generateAPIPodStatus, kl.volumeManager.WaitForAttachAndMount,kl.containerRuntime.SyncPod, 这是创建pod过程中三个特别重要的过程,该过程一方面描述了创建pod的必经之路,另一方面,还包括pod status在kubelet内部的状态演进。接下来按照此顺序深入描述这三点:
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
	klog.V(3).Infof("Generating status for %q", format.Pod(pod))

        // 设定api pod status内的pod IP,qosClass,containerStatus和initContainerStatus,podCondition
        // 其中containerStatus包含了一些容器的基本信息,容器id,启动停止时间等
	s := kl.convertStatusToAPIStatus(pod, podStatus)
        // omitted ...
	// Assume info is ready to process
	spec := &pod.Spec
	allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
        // phase是kubelet对pod的状态机的划分,一般包括
        // PodPending, PodRunning, PodSucceeded, PodFailed
        // 由于phase是根据container的status来划分的,container是status又来源于statusManager具有一定的延迟性
       // 所以对于一个新创建的pod(指没有通过runtime去创建容器),它的phase是pending
	s.Phase = getPhase(spec, allStatus)
        // omitted ...
        // 设置readiness和liveness的状态
	kl.probeManager.UpdatePodStatus(pod.UID, s)
        // 设置pod的condition
	s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
	s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.Conditions, s.ContainerStatuses, s.Phase))
	s.Conditions = append(s.Conditions, status.GenerateContainersReadyCondition(spec, s.ContainerStatuses, s.Phase))
	// Status manager will take care of the LastTransitionTimestamp, either preserve
	// the timestamp from apiserver, or set a new one. When kubelet sees the pod,
	// `PodScheduled` condition must be true.
        // schedule的是因为pod被bind到某一节点,当此节点上的kubelet watch到此pod的时候已经完成了调度工作
	s.Conditions = append(s.Conditions, v1.PodCondition{
		Type:   v1.PodScheduled,
		Status: v1.ConditionTrue,
	})

	if kl.kubeClient != nil {
		hostIP, err := kl.getHostIPAnyWay()
		if err != nil {
			klog.V(4).Infof("Cannot get host IP: %v", err)
		} else {
			s.HostIP = hostIP.String()
			if kubecontainer.IsHostNetworkPod(pod) && s.PodIP == "" {
				s.PodIP = hostIP.String()
			}
		}
	}

	return *s
}
// volumeManager是一个逻辑上不复杂但组成构件特别复杂的插件,它主要负责pod的volume的创建和销毁工作,通常,它需要和podManager和statusManager协同工作来分别提取pod spec内基本信息以及pod的状态信息,从而决定对pod的volume做相应的操作
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
	if pod == nil {
		return nil
	}

        // 从pod的spec里面解析出所需的volume
	expectedVolumes := getExpectedVolumes(pod)
	if len(expectedVolumes) == 0 {
		// No volumes to verify
		return nil
	}

	klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
       // 获取pod的uid
	uniquePodName := util.GetUniquePodName(pod)

	// Some pods expect to have Setup called over and over again to update.
	// Remount plugins for which this is true. (Atomically updating volumes,
	// like Downward API, depend on this to update the contents of the volume).
	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)

         // 对比已经挂载好的volume和expect的volume,在2分3秒的时间范围内,如果此volume挂载完毕则
         // 正常退出
	err := wait.PollImmediate(
		podAttachAndMountRetryInterval,
		podAttachAndMountTimeout,
		vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))

	if err != nil {
		// Timeout expired
		unmountedVolumes :=
			vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
		// Also get unattached volumes for error message
		unattachedVolumes :=
			vm.getUnattachedVolumes(expectedVolumes)

		if len(unmountedVolumes) == 0 {
			return nil
		}

		return fmt.Errorf(
			"timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v",
			pod.Namespace,
			pod.Name,
			unmountedVolumes,
			unattachedVolumes)
	}

	klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
	return nil
}

// 接下来则是创建pod的重中之重,调用runtime创建sandbox和container
// 对于一个创建过程的pod而言,直接创建就完事了,当然了还是需要关注computePodActions
// computePodActions一般针对那些上一次pod创建失败的pod所做的清理工作
// 清理工作可能包含重新创建sandbox,清理脏的container然后重建等等
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)
	klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))

       // omitted ...
       

	podIP := ""
	if podStatus != nil {
		podIP = podStatus.IP
	}

	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
		createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
                // 调用rpc方法创建sandbox
                // 在调用docker创建sandbox的时候一般具有以下流程
                // 1. 生成podSandboxConfig
                // 2. rpc调用根据config创建sandbox
                // 3. rpc服务端接收到请求转发给runtime
                // 4. runtime解析config
                // 5. runtime 拉镜像
                // 6. 生成sandbox container的config,打上podsandbox的label等
                // 7. 调用docker创建sandbox container
                // 8. 设置sandbox的网络为ready为false以及创建sandbox的check point
                // 9. 启动sandbox容器
                // 10. 覆写resolv.conf
                // 11. 调用cni插件创建sandbox的网络
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		if err != nil {
			createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
			klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
			ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
			if referr != nil {
				klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
			}
			m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
			return
		}
		klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

		podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
		if err != nil {
			ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
			if referr != nil {
				klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
			}
			m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
			klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
			result.Fail(err)
			return
		}

		// If we ever allow updating a pod from non-host-network to
		// host-network, we may use a stale IP.
		if !kubecontainer.IsHostNetworkPod(pod) {
			// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
			podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
			klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
		}
	}

	// Get podSandboxConfig for containers to start.
	configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
		return
	}


         // 拿到sandbox的config,可以据此创建initContainer和container
         // 创建container的步骤包括:
         // 1. 下载镜像
         // 2. 生成container config 并创建container
         // 3. 启动container
         // 4. 执行post-start container
	// Step 5: start the init container.
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		// Start the next init container.
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
		result.AddSyncResult(startContainerResult)
		isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			klog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
			return
		}

		klog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
			startContainerResult.Fail(err, msg)
			utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
			return
		}

		// Successfully started the container; clear the entry in the failure
		klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
	}

	// Step 6: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		container := &pod.Spec.Containers[idx]
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			klog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
			continue
		}

		klog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				klog.V(3).Infof("container start failed: %v: %s", err, msg)
			default:
				utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
			}
			continue
		}
	}

	return
}
  1. 其实runtime成功创建完sandbox和container,一个pod实体已经产生了,但是它仍需要以下几个步骤:
    PLEG敦促status的phase变成running,并且将此status同步给apiserver。
    首先还是日志:

pod lifecycle 之 informer

Pod创建概述

  1. pod 被 调度至某一节点
  2. kubelet 收到创建pod请求
  3. 创建volume,sandbox,container
  4. 启用cadvisor监控,同时向apiserver汇报pod的status

前提

我们首先要了解一些内置的功能才能了解kubelet是如何通过watch来trigger完成pod的生命周期的轮转,首先很重要的一点就是list and watch功能。
该功能会从特定的来源查看pod的基本信息和变化信息,根据这些信息和本地cache进行对比,从而促使kubelet做出ADD,UPDATE,REMOVE动作。

它的基本逻辑是:

  1. 从source处list所有资源,加载到自己的cache内,通过键值对的方式进行存储
  2. 从source处watch特定资源,由于kubelet只watch node 和pod,此处只关心pod, 可以通过event的type来做出简单判断
  3. 在发给kubelet处理之前,需要将watch到的pod状态和本地的pod的状态进行对比,其中DeletionTimestamp不为空的视为(DELETE,API),状态不一致的视为(RECONCILE,API),对比后发现本地资源已经缺失,视为(REMOVE, API), 而此篇要描述(ADD,API)以及 (UPDATE, API)

它首先要定义pod的来源,目前有三个来源: StaticPodPath, StaticPodURL, 以及apiserver,这里我只关心最一般的情况,pod 来源于apiserver

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

// omitted...

	if kubeDeps.KubeClient != nil {
		klog.Infof("Watching apiserver")
		if updatechannel == nil {
			updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		}
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
	}
	return cfg, nil
}

接下来可以此处是一个桥接的过程,通过list and watch函数从apiserver取出信息,然后先将数据存入本地cache,一旦本地cache发生变化,则将本地cache通过source channel 推给预处理的逻辑部分。

  1. 实例化source channel, 该channel会经由mux 进行merge,然后发送给新的channel PodConfig.updates
updatechannel = cfg.Channel(kubetypes.ApiserverSource)

// 实例化channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	return c.mux.Channel(source)
}

// 添加一个新的source,并对该channel保持监听并进行处理再重新分发
func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}
  1. 实例化list and watch 函数,并通过1内的channel传出list and watch到的信息
// 首先是实例化list and watch函数,它通过指定类型来实例化具体的函数
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

// 此处已经确定资源类型,应该是pod资源
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do().
			Get()
	}
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch()
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

// 注意send 函数,此处通过它将list and watch的信息透传出来,该updates就是1内的管道
// 在初始化完成后,则开始运行
// 另外注意此处塞入管道的数据类型PodUpdate, op为 kubetypes.SET
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}
  1. list 和 resync,(定期地)将所有资源载入到本地缓存中
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
	var resourceVersion string

	options := metav1.ListOptions{ResourceVersion: "0"}
	r.metrics.numberOfLists.Inc()
	start := r.clock.Now()
	list, err := r.listerWatcher.List(options)
	if err != nil {
		return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
	}
	r.metrics.listDuration.Observe(time.Since(start).Seconds())
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion()
	items, err := meta.ExtractList(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
	}
	r.metrics.numberOfItemsInList.Observe(float64(len(items)))

        // 通过键值对的方式,将items存入本地缓存,方法为Replace,在replace之后,会将cache内的list内容全量推给 source channel

	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
	}
	r.setLastSyncResourceVersion(resourceVersion)

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
                        // 虽然写了resync的逻辑,但是kubelet使用的cache并不具备resync的功能
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

// omitted ...
}

// 调用replace后会将本地缓存内的所有数据全量推给source channel
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
	if err := u.Store.Replace(list, resourceVersion); err != nil {
		return err
	}
	u.PushFunc(u.Store.List())
	return nil
}
  1. watch 并且分发
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
//  omitted ...
	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
		}

		r.metrics.numberOfWatches.Inc()
                // 根据上下文,此处watch的是pod的变动
		w, err := r.listerWatcher.Watch(options)
// omitted ...

		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
			}
			return nil
		}
	}
}

// 根据watch到的数据类型进行分发
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
// omitted
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrs.FromObject(event.Object)
			}
			if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
				utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
				continue
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			newResourceVersion := meta.GetResourceVersion()
// 根据watch到的事件类型在cache内做相应的操作
// 如果是新增,则在cache内添加新的键值对,还包含一些index的新增,此处不赘述
// 如果是更新,则在cache内更改键值对,修改index
// 如果是删除,则在cache内删除键值对,删除index
// 无论是新增更新或者删除,在cache内做完相应的操作后,都会将cache内的所有信息推送到source channel内
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}

	watchDuration := r.clock.Now().Sub(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		r.metrics.numberOfShortWatches.Inc()
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
	return nil
}
  1. source channel接收到了list 和watch的信息,开始根据op类型进行分发
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
// 在此处进行判断,对于cache推送过来的pod作相应的处理
// 应该认识到,cache内的信息已经完成了变更
	adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
// 根据类型将信息分发给podConfig的updates channel,该channel会吐出信息让kubelet分别处理
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}
		if len(restores.Pods) > 0 {
			s.updates <- *restores
		}
		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
			// Send an empty update when first seeing the source and there are
			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
			// the source is ready.
			s.updates <- *adds
		}
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
		if len(reconciles.Pods) > 0 {
			s.updates <- *reconciles
		}

// omitted ...
	return nil
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
	s.podLock.Lock()
	defer s.podLock.Unlock()

	addPods := []*v1.Pod{}
	updatePods := []*v1.Pod{}
	deletePods := []*v1.Pod{}
	removePods := []*v1.Pod{}
	reconcilePods := []*v1.Pod{}
	restorePods := []*v1.Pod{}

// 此处的pod存在pod的source内,和cache内的有一定区别,cache内的pod均已完成更新
	pods := s.pods[source]
	if pods == nil {
		pods = make(map[types.UID]*v1.Pod)
	}

// pod source内的pod信息会在此之后完成更新,更新的内容包括firstseentime,source,在source内
// 保存的形式为键值对
	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
		filtered := filterInvalidPods(newPods, source, s.recorder)
		for _, ref := range filtered {
			// Annotate the pod with the source before any comparison.
			if ref.Annotations == nil {
				ref.Annotations = make(map[string]string)
			}
			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
			if existing, found := oldPods[ref.UID]; found {
				pods[ref.UID] = existing
				needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
				if needUpdate {
					updatePods = append(updatePods, existing)
				} else if needReconcile {
					reconcilePods = append(reconcilePods, existing)
				} else if needGracefulDelete {
					deletePods = append(deletePods, existing)
				}
				continue
			}
			recordFirstSeenTime(ref)
			pods[ref.UID] = ref
// 没有在source里面找到的都是新增的pod,需要创建
			addPods = append(addPods, ref)
		}
	}

	update := change.(kubetypes.PodUpdate)
	switch update.Op {
// omitted ...
        // 只关心此处是因为,从list watch发过来的podConfig只是 SET 类型,没有其他任何类型
	case kubetypes.SET:
		klog.V(4).Infof("Setting pods for source %s", source)
		s.markSourceSet(source)
		// Clear the old map entries by just creating a new map
		oldPods := pods
		pods = make(map[types.UID]*v1.Pod)
		updatePodsFunc(update.Pods, oldPods, pods)
		for uid, existing := range oldPods {
			if _, found := pods[uid]; !found {
				// this is a delete
// source里面有一个pod是cache里面没有的,则需要删掉它
				removePods = append(removePods, existing)
			}
		}
// omitted ...

	}

	s.pods[source] = pods

	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
	deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
	removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
	restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}

	return adds, updates, deletes, removes, reconciles, restores
}

// 只有状态变化的为reconcile
// DeletionTimestamp不为空的 为 DELETE
// 除了status之外字段不一致且DeletionTimestamp为空的为update
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {

	// 1. this is a reconcile
	// TODO: it would be better to update the whole object and only preserve certain things
	//       like the source annotation or the UID (to ensure safety)
	if !podsDifferSemantically(existing, ref) {
		// this is not an update
		// Only check reconcile when it is not an update, because if the pod is going to
		// be updated, an extra reconcile is unnecessary
		if !reflect.DeepEqual(existing.Status, ref.Status) {
			// Pod with changed pod status needs reconcile, because kubelet should
			// be the source of truth of pod status.
			existing.Status = ref.Status
			needReconcile = true
		}
		return
	}

	// Overwrite the first-seen time with the existing one. This is our own
	// internal annotation, there is no need to update.
	ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]

	existing.Spec = ref.Spec
	existing.Labels = ref.Labels
	existing.DeletionTimestamp = ref.DeletionTimestamp
	existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
	existing.Status = ref.Status
	updateAnnotations(existing, ref)

	// 2. this is an graceful delete
	if ref.DeletionTimestamp != nil {
		needGracefulDelete = true
	} else {
		// 3. this is an update
		needUpdate = true
	}

	return
}
  1. 启动kubelet,并且从pod updates的channel里面读取pod信息
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go wait.Until(func() {
		k.Run(podCfg.Updates())
	}, 0, wait.NeverStop)
// omitted ...
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// omitted ...
	kl.syncLoop(updates, kl)
}

// 这样子从apiserver看到的包括list和watch到的信息,全部都分发到这里由kubelet分别处理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.RESTORE:
			klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
			// These are pods restored from the checkpoint. Treat them as new
			// pods.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.Errorf("Kubelet does not support snapshot update")
		}

		if u.Op != kubetypes.RESTORE {
			// If the update type is RESTORE, it means that the update is from
			// the pod checkpoints and may be incomplete. Do not mark the
			// source as ready.

			// Mark the source ready after receiving at least one update from the
			// source. Once all the sources are marked ready, various cleanup
			// routines will start reclaiming resources. It is important that this
			// takes place only after kubelet calls the update handler to process
			// the update to ensure the internal pod cache is up-to-date.
			kl.sourcesReady.AddSource(u.Source)
		}

	}
	return true
}
  1. 总结一下

[x] list 和 watch 负责从apisever拉取pod信息,更新本地的cache,并全量推cache至source channel
[x] pod source 负责对比 pod source内的pod 和 cache之间的差别,决定对pod做相应处理并投递至 update channel
[x] kubelet 负责从updates channel里面读取不同类别的信息,进行相应的处理

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.