- kube-scheduler源码分析(一)之 NewSchedulerCommand
- 1. Main 函数
- 2. NewSchedulerCommand
- 2.1. NewOptions
- 2.2. Options.Config
- 2.3. AddFlags
- 3. Run
- 3.1. NewSchedulerConfig
- 3.2. InformerFactory.Start
- 3.3. WaitForCacheSync
- 3.3.1. InformerFactory.WaitForCacheSync
- 3.3.2. controller.WaitForCacheSync
- 3.4. LeaderElection
- 3.5. Scheduler.Run
kube-scheduler源码分析(一)之 NewSchedulerCommand
以下代码分析基于
kubernetes v1.12.0版本。
scheduler的cmd代码目录结构如下:
kube-scheduler├── BUILD├── OWNERS├── app # app的目录下主要为运行scheduler相关的对象│ ├── BUILD│ ├── config│ │ ├── BUILD│ │ └── config.go # Scheduler的配置对象config│ ├── options # options主要记录 Scheduler 使用到的参数│ │ ├── BUILD│ │ ├── configfile.go│ │ ├── deprecated.go│ │ ├── deprecated_test.go│ │ ├── insecure_serving.go│ │ ├── insecure_serving_test.go│ │ ├── options.go # 主要包括Options、NewOptions、AddFlags、Config等函数│ │ └── options_test.go│ └── server.go # 主要包括 NewSchedulerCommand、NewSchedulerConfig、Run等函数└── scheduler.go # main入口函数
1. Main函数
此部分的代码为/cmd/kube-scheduler/scheduler.go
kube-scheduler的入口函数Main函数,仍然是采用统一的代码风格,使用Cobra命令行框架。
func main() {rand.Seed(time.Now().UTC().UnixNano())command := app.NewSchedulerCommand()// TODO: once we switch everything over to Cobra commands, we can go back to calling// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the// normalize func and add the go flag set by hand.pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)// utilflag.InitFlags()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}}
核心代码:
// 初始化scheduler命令结构体command := app.NewSchedulerCommand()// 执行Executeerr := command.Execute()
2. NewSchedulerCommand
此部分的代码为/cmd/kube-scheduler/app/server.go
NewSchedulerCommand主要用来构造和初始化SchedulerCommand结构体,
// NewSchedulerCommand creates a *cobra.Command object with default parametersfunc NewSchedulerCommand() *cobra.Command {opts, err := options.NewOptions()if err != nil {glog.Fatalf("unable to initialize command options: %v", err)}cmd := &cobra.Command{Use: "kube-scheduler",Long: `The Kubernetes scheduler is a policy-rich, topology-aware,workload-specific function that significantly impacts availability, performance,and capacity. The scheduler needs to take into account individual and collectiveresource requirements, quality of service requirements, hardware/software/policyconstraints, affinity and anti-affinity specifications, data locality, inter-workloadinterference, deadlines, and so on. Workload-specific requirements will be exposedthrough the API as necessary.`,Run: func(cmd *cobra.Command, args []string) {verflag.PrintAndExitIfRequested()utilflag.PrintFlags(cmd.Flags())if len(args) != 0 {fmt.Fprint(os.Stderr, "arguments are not supported\n")}if errs := opts.Validate(); len(errs) > 0 {fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))os.Exit(1)}if len(opts.WriteConfigTo) > 0 {if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)return}c, err := opts.Config()if err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}stopCh := make(chan struct{})if err := Run(c.Complete(), stopCh); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}},}opts.AddFlags(cmd.Flags())cmd.MarkFlagFilename("config", "yaml", "yml", "json")return cmd}
核心代码:
// 构造optionopts, err := options.NewOptions()// 初始化config对象c, err := opts.Config()// 执行run函数err := Run(c.Complete(), stopCh)// 添加参数opts.AddFlags(cmd.Flags())
2.1. NewOptions
NewOptions主要用来构造SchedulerServer使用的参数和上下文,其中核心参数是KubeSchedulerConfiguration。
opts, err := options.NewOptions()
NewOptions:
// NewOptions returns default scheduler app options.func NewOptions() (*Options, error) {cfg, err := newDefaultComponentConfig()if err != nil {return nil, err}hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)if err != nil {return nil, err}o := &Options{ComponentConfig: *cfg,SecureServing: nil, // TODO: enable with apiserveroptions.NewSecureServingOptions()CombinedInsecureServing: &CombinedInsecureServingOptions{Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{BindNetwork: "tcp",},Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{BindNetwork: "tcp",},BindPort: hport,BindAddress: hhost,},Authentication: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthenticationOptions()Authorization: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthorizationOptions()Deprecated: &DeprecatedOptions{UseLegacyPolicyConfig: false,PolicyConfigMapNamespace: metav1.NamespaceSystem,},}return o, nil}
2.2. Options.Config
Config初始化调度器的配置对象。
c, err := opts.Config()
Config函数主要执行以下操作:
- 构建scheduler client、leaderElectionClient、eventClient。
- 创建event recorder
- 设置leader选举
- 创建informer对象,主要函数有
NewSharedInformerFactory和NewPodInformer。
Config具体代码如下:
// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {c := &schedulerappconfig.Config{}if err := o.ApplyTo(c); err != nil {return nil, err}// prepare kube clients.client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)if err != nil {return nil, err}// Prepare event clients.eventBroadcaster := record.NewBroadcaster()recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})// Set up leader election if enabled.var leaderElectionConfig *leaderelection.LeaderElectionConfigif c.ComponentConfig.LeaderElection.LeaderElect {leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)if err != nil {return nil, err}}c.Client = clientc.InformerFactory = informers.NewSharedInformerFactory(client, 0)c.PodInformer = factory.NewPodInformer(client, 0)c.EventClient = eventClientc.Recorder = recorderc.Broadcaster = eventBroadcasterc.LeaderElection = leaderElectionConfigreturn c, nil}
2.3. AddFlags
AddFlags为SchedulerServer添加指定的参数。
opts.AddFlags(cmd.Flags())
AddFlags函数的具体代码如下:
// AddFlags adds flags for the scheduler options.func (o *Options) AddFlags(fs *pflag.FlagSet) {fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")o.SecureServing.AddFlags(fs)o.CombinedInsecureServing.AddFlags(fs)o.Authentication.AddFlags(fs)o.Authorization.AddFlags(fs)o.Deprecated.AddFlags(fs, &o.ComponentConfig)leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, fs)utilfeature.DefaultFeatureGate.AddFlag(fs)}
3. Run
此部分的代码为/cmd/kube-scheduler/app/server.go
err := Run(c.Complete(), stopCh)
Run运行一个不退出的常驻进程,来执行scheduler的相关操作。
Run函数的主要内容如下:
- 通过scheduler config来创建scheduler的结构体。
- 运行event broadcaster、healthz server、metrics server。
- 运行所有的informer并在调度前等待cache的同步(重点)。
- 执行
sched.Run()来运行scheduler的调度逻辑。 - 如果多个scheduler并开启了
LeaderElect,则执行leader选举。
以下对重点代码分开分析:
3.1. NewSchedulerConfig
NewSchedulerConfig初始化SchedulerConfig(此部分具体逻辑待后续专门分析),最后初始化生成scheduler结构体。
// Build a scheduler config from the provided algorithm source.schedulerConfig, err := NewSchedulerConfig(c)if err != nil {return err}// Create the scheduler.sched := scheduler.NewFromConfig(schedulerConfig)
3.2. InformerFactory.Start
运行PodInformer,并运行InformerFactory。此部分的逻辑为client-go的informer机制,在Informer机制中有详细分析。
// Start all informers.go c.PodInformer.Informer().Run(stopCh)c.InformerFactory.Start(stopCh)
3.3. WaitForCacheSync
在调度前等待cache同步。
// Wait for all caches to sync before scheduling.c.InformerFactory.WaitForCacheSync(stopCh)controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
3.3.1. InformerFactory.WaitForCacheSync
InformerFactory.WaitForCacheSync等待所有启动的informer的cache进行同步,保持本地的store信息与etcd的信息是最新一致的。
// WaitForCacheSync waits for all started informers' cache were synced.func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {informers := func() map[reflect.Type]cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informers := map[reflect.Type]cache.SharedIndexInformer{}for informerType, informer := range f.informers {if f.startedInformers[informerType] {informers[informerType] = informer}}return informers}()res := map[reflect.Type]bool{}for informType, informer := range informers {res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)}return res}
接着调用cache.WaitForCacheSync。
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false// if the controller should shutdownfunc WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {err := wait.PollUntil(syncedPollPeriod,func() (bool, error) {for _, syncFunc := range cacheSyncs {if !syncFunc() {return false, nil}}return true, nil},stopCh)if err != nil {glog.V(2).Infof("stop requested")return false}glog.V(4).Infof("caches populated")return true}
3.3.2. controller.WaitForCacheSync
controller.WaitForCacheSync是对cache.WaitForCacheSync的一层封装,通过不同的controller的名字来记录不同controller等待cache同步。
controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced)
controller.WaitForCacheSync具体代码如下:
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages// indicating that the controller identified by controllerName is waiting for syncs, followed by// either a successful or failed sync.func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {glog.Infof("Waiting for caches to sync for %s controller", controllerName)if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))return false}glog.Infof("Caches are synced for %s controller", controllerName)return true}
3.4. LeaderElection
如果有多个scheduler,并开启leader选举,则运行LeaderElector直到选举结束或退出。
// If leader election is enabled, run via LeaderElector until done and exit.if c.LeaderElection != nil {c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{OnStartedLeading: run,OnStoppedLeading: func() {utilruntime.HandleError(fmt.Errorf("lost master"))},}leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)if err != nil {return fmt.Errorf("couldn't create leader elector: %v", err)}leaderElector.Run(ctx)return fmt.Errorf("lost lease")}
3.5. Scheduler.Run
// Prepare a reusable run function.run := func(ctx context.Context) {sched.Run()<-ctx.Done()}ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used heredefer cancel()go func() {select {case <-stopCh:cancel()case <-ctx.Done():}}()...run(ctx)
Scheduler.Run先等待cache同步,然后开启调度逻辑的goroutine。
Scheduler.Run的具体代码如下:
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.func (sched *Scheduler) Run() {if !sched.config.WaitForCacheSync() {return}go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)}
以上是对/cmd/kube-scheduler/scheduler.go部分代码的分析,Scheduler.Run后续的具体代码位于pkg/scheduler/scheduler.go待后续文章分析。
参考:
- https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kube-scheduler
- https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-scheduler/scheduler.go
- https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-scheduler/app/server.go
