Comments (2)
回答你第一个问题
shareInformers.Core().V1().Namespaces().Informer() 调用的方法如下
//创建informer
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
//如果已经存在, 就不会再创建新的
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
from sig-kubernetes.
https://blog.ihypo.net/15763910382218.html 可能会对你的理解有所帮助
NewSharedInformerFactoryWithOptions中会返回一个SharedInformerFactory其中有一个informers 是map[reflect.Type]cache.SharedIndexInformer 那么对应类型的informer(这段代码在staging/src/k8s.io/client-go/informers/factory.go),
//对应类型的对应sharedIndexInformer
type sharedIndexInformer struct {
indexer Indexer
controller Controller //这里的controller 值得注意
/*
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
*/
processor *sharedProcessor
cacheMutationDetector MutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
//controller的run实际是调用了reflector 的run方法 所以我理解在informer启动的时候会调用controller的run,那么controller的run会调用reflector的run方法,具体informer的run可以看最后面
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
// Start initializes all requested informers.
//此段代码在client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
//如果informer是 false 也就是未启动状态 那么!false=true
if !f.startedInformers[informerType] {
//启动这个informer
go informer.Run(stopCh)
//启动之后做标记
f.startedInformers[informerType] = true
}
}
}
type SharedInformer interface {
...
Run(stopCh <-chan struct{})
...
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
...
s.controller.Run(stopCh)
//所以这里也是调用了controller的run ,所以我理解一个type 对应一个informer 这里共享 reflector
}
队列这块的话 书中也提到了限速队列以及延迟对了和正常的队列
这块我找到了client-go/util/workqueue/ 这下面会有队列的一些代码
from sig-kubernetes.
Related Issues (20)
- 【提问】kube-proxy 在 IPVS 模式 获取来源IP问题;
- 【分享】关于 Phase、State、Conditions 的理解
- 分享如何调度器framework扩展 HOT 1
- 【活动】【每周研讨讲师招募】开场综述 HOT 1
- 【活动】【每周研讨讲师招募】启动过程前半部分代码导读(15m) HOT 2
- 【活动】11月7日研讨,scheduler参数介绍【书8.1】
- 【活动】11.7 研讨,命令行参数解析源码分享 【书8.3.2】
- 【活动】11.7 研讨,实例化scheduler过程 源码分享 【书8.3.3】
- Mjölnir-Bot test HOT 5
- 【分享】kubernetes 领域书籍推荐 HOT 8
- 【提问】kubevela跟Kubephere、Rancher、Openshift这些paas平台有啥区别 HOT 2
- 面试 云计算工程师的golang中级 HOT 1
- 【提问】老哥们,请问在哪里可以继续申请阅读或者购买Programming Kubernetes HOT 1
- 【提问】pv_controller中的resync协程是什么作用?为什么这么设计? HOT 2
- k8s源码分析学习 HOT 1
- 【提问】k8s indexinformer启动过程发送事件变更如何处理 HOT 10
- dockerfile的命令只执行了一半就结束了 HOT 3
- kubernetes源码研习社 HOT 1
- 【提问】pod拥有多个ownerreferences的示例 HOT 4
- 【分享】k8s源码学习大礼包 HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from sig-kubernetes.